Combine 响应式流

掌握声明式异步事件处理。深入理解 Publisher、Subscriber 及常用操作符在复杂业务中的应用。

Apple 官方的响应式框架,用声明式方式处理异步事件流,是 SwiftUI 数据流的底层支撑。


1. 核心概念

Combine 框架基于发布-订阅模式,由三个核心组件构成:

核心三要素

组件 作用 类比
Publisher 数据的生产者 水龙头
Subscriber 数据的消费者 水杯
Operator 数据的转换器 过滤器/净水器

数据流示意

Publisher → Operator → Operator → Subscriber
   ↓           ↓           ↓          ↓
  产生数据    过滤      转换格式     接收使用

2. Publisher 发布者

内置 Publisher 类型

// Just - 发送单个值后完成
let just = Just("Hello")

// Future - 异步产生单个值
let future = Future<String, Error> { promise in
    DispatchQueue.global().async {
        promise(.success("异步结果"))
    }
}

// PassthroughSubject - 手动发送值
let subject = PassthroughSubject<String, Never>()
subject.send("Hello")
subject.send("World")

// CurrentValueSubject - 带初始值,可获取当前值
let currentValue = CurrentValueSubject<Int, Never>(0)
print(currentValue.value)  // 0
currentValue.send(1)
print(currentValue.value)  // 1

@Published 属性包装器

class UserSettings: ObservableObject {
    @Published var username = ""
    @Published var isLoggedIn = false
}

let settings = UserSettings()

// $username 是一个 Publisher
settings.$username
    .sink { name in
        print("用户名变更: \(name)")
    }

settings.username = "张三"  // 输出: 用户名变更: 张三

3. Subscriber 订阅者

sink - 最常用的订阅方式

let publisher = [1, 2, 3, 4, 5].publisher

// 处理值和完成事件
let cancellable = publisher
    .sink(
        receiveCompletion: { completion in
            switch completion {
            case .finished:
                print("完成")
            case .failure(let error):
                print("错误: \(error)")
            }
        },
        receiveValue: { value in
            print("收到: \(value)")
        }
    )

assign - 直接赋值给属性

class ViewModel {
    var text: String = ""
}

let vm = ViewModel()
let publisher = Just("Hello World")

let cancellable = publisher
    .assign(to: \.text, on: vm)

print(vm.text)  // "Hello World"

管理订阅生命周期

class MyViewController: UIViewController {
    private var cancellables = Set<AnyCancellable>()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        
        somePublisher
            .sink { value in
                // 处理值
            }
            .store(in: &cancellables)  // 自动管理生命周期
    }
}

4. Operator 操作符

转换操作符

// map - 转换值
[1, 2, 3].publisher
    .map { $0 * 2 }
    .sink { print($0) }  // 2, 4, 6

// flatMap - 展开嵌套 Publisher
struct User { let name: String }

func fetchUser(id: Int) -> AnyPublisher<User, Error> {
    // 返回用户的 Publisher
}

[1, 2, 3].publisher
    .flatMap { id in
        fetchUser(id: id)
    }
    .sink(receiveCompletion: { _ in },
          receiveValue: { user in print(user.name) })

// compactMap - 过滤 nil
["1", "two", "3"].publisher
    .compactMap { Int($0) }
    .sink { print($0) }  // 1, 3

过滤操作符

// filter - 条件过滤
[1, 2, 3, 4, 5].publisher
    .filter { $0 % 2 == 0 }
    .sink { print($0) }  // 2, 4

// removeDuplicates - 去重
[1, 1, 2, 2, 3, 1].publisher
    .removeDuplicates()
    .sink { print($0) }  // 1, 2, 3, 1

// first / last
[1, 2, 3].publisher
    .first()
    .sink { print($0) }  // 1

// debounce - 防抖(常用于搜索)
searchTextField.publisher
    .debounce(for: .milliseconds(300), scheduler: RunLoop.main)
    .sink { query in
        performSearch(query)
    }

// throttle - 节流
scrollView.publisher
    .throttle(for: .milliseconds(100), scheduler: RunLoop.main, latest: true)
    .sink { offset in
        updateUI(offset)
    }

组合操作符

// combineLatest - 组合多个 Publisher 的最新值
let name = PassthroughSubject<String, Never>()
let age = PassthroughSubject<Int, Never>()

Publishers.CombineLatest(name, age)
    .sink { name, age in
        print("\(name), \(age)岁")
    }

name.send("张三")
age.send(25)  // 输出: 张三, 25岁
age.send(26)  // 输出: 张三, 26岁

// merge - 合并多个同类型 Publisher
let pub1 = PassthroughSubject<Int, Never>()
let pub2 = PassthroughSubject<Int, Never>()

Publishers.Merge(pub1, pub2)
    .sink { print($0) }

pub1.send(1)  // 1
pub2.send(2)  // 2

// zip - 配对组合
let letters = ["A", "B", "C"].publisher
let numbers = [1, 2, 3].publisher

Publishers.Zip(letters, numbers)
    .sink { print("\($0)\($1)") }  // A1, B2, C3

错误处理操作符

// catch - 捕获错误并提供替代
fetchData()
    .catch { error in
        Just(fallbackData)  // 发生错误时使用备用数据
    }
    .sink { data in print(data) }

// retry - 重试
fetchData()
    .retry(3)  // 失败后重试3次
    .sink(receiveCompletion: { _ in },
          receiveValue: { data in print(data) })

// replaceError - 替换错误为默认值
fetchData()
    .replaceError(with: defaultData)
    .sink { data in print(data) }

5. 实战案例

网络请求

struct Post: Codable {
    let id: Int
    let title: String
}

class APIService {
    func fetchPosts() -> AnyPublisher<[Post], Error> {
        let url = URL(string: "https://api.example.com/posts")!
        
        return URLSession.shared.dataTaskPublisher(for: url)
            .map(\.data)
            .decode(type: [Post].self, decoder: JSONDecoder())
            .receive(on: DispatchQueue.main)
            .eraseToAnyPublisher()
    }
}

// 使用
class ViewModel: ObservableObject {
    @Published var posts: [Post] = []
    @Published var error: String?
    
    private var cancellables = Set<AnyCancellable>()
    private let api = APIService()
    
    func loadPosts() {
        api.fetchPosts()
            .sink(
                receiveCompletion: { [weak self] completion in
                    if case .failure(let error) = completion {
                        self?.error = error.localizedDescription
                    }
                },
                receiveValue: { [weak self] posts in
                    self?.posts = posts
                }
            )
            .store(in: &cancellables)
    }
}

搜索防抖

class SearchViewModel: ObservableObject {
    @Published var searchText = ""
    @Published var results: [SearchResult] = []
    
    private var cancellables = Set<AnyCancellable>()
    
    init() {
        $searchText
            .debounce(for: .milliseconds(300), scheduler: RunLoop.main)
            .removeDuplicates()
            .filter { !$0.isEmpty }
            .flatMap { query in
                self.search(query: query)
                    .catch { _ in Just([]) }
            }
            .receive(on: DispatchQueue.main)
            .assign(to: &$results)
    }
    
    private func search(query: String) -> AnyPublisher<[SearchResult], Error> {
        // 搜索 API 调用
    }
}

表单验证

class FormViewModel: ObservableObject {
    @Published var email = ""
    @Published var password = ""
    @Published var isValid = false
    
    private var cancellables = Set<AnyCancellable>()
    
    init() {
        // 组合验证
        Publishers.CombineLatest($email, $password)
            .map { email, password in
                self.isValidEmail(email) && password.count >= 8
            }
            .assign(to: &$isValid)
    }
    
    private func isValidEmail(_ email: String) -> Bool {
        email.contains("@") && email.contains(".")
    }
}

6. 调度器 (Scheduler)

控制代码执行的线程:

fetchData()
    .subscribe(on: DispatchQueue.global())  // 在后台线程订阅
    .receive(on: DispatchQueue.main)        // 在主线程接收
    .sink { data in
        // 这里在主线程,可以更新 UI
        self.updateUI(data)
    }

常用调度器

调度器 用途
DispatchQueue.main 主线程,UI 更新
DispatchQueue.global() 后台线程,耗时操作
RunLoop.main 主运行循环
ImmediateScheduler.shared 立即执行,同步

7. Combine vs async/await

场景 Combine async/await
单次异步操作 ⭐⭐ ⭐⭐⭐
持续数据流 ⭐⭐⭐ ⭐⭐
UI 绑定 ⭐⭐⭐ ⭐⭐
代码简洁度 ⭐⭐ ⭐⭐⭐
取消管理 手动 自动

桥接两者

// Combine → async/await
let value = try await publisher.values.first(where: { _ in true })

// async/await → Combine
func asyncToPublisher() -> AnyPublisher<Data, Error> {
    Future { promise in
        Task {
            do {
                let data = try await fetchDataAsync()
                promise(.success(data))
            } catch {
                promise(.failure(error))
            }
        }
    }
    .eraseToAnyPublisher()
}

8. 最佳实践

✅ 推荐

// 1. 使用 store(in:) 管理生命周期
publisher
    .sink { }
    .store(in: &cancellables)

// 2. 使用 eraseToAnyPublisher() 隐藏实现细节
func fetchUser() -> AnyPublisher<User, Error> {
    // ...
    .eraseToAnyPublisher()
}

// 3. 使用 weak self 避免循环引用
publisher
    .sink { [weak self] value in
        self?.handle(value)
    }

❌ 避免

// 1. 不要忽略返回的 AnyCancellable
let _ = publisher.sink { }  // ❌ 立即取消

// 2. 不要在 sink 中进行复杂逻辑
publisher
    .sink { value in
        // ❌ 应该把逻辑放到操作符中
        if value > 0 {
            let transformed = value * 2
            // ...
        }
    }

// 3. 不要过度嵌套
publisher
    .flatMap { a in
        anotherPublisher
            .flatMap { b in  // ❌ 难以阅读
                yetAnother
            }
    }