1. はじめに
1.1 本記事のゴールと想定読者
- ゴール: Combineの基礎(Publisher/Subscriber/Operator)を、SwiftUIと連携しながら“手を動かして”理解する。
- 読者: Rx経験がない・少ない人、SwiftUIのデータフローをもう少し強化したい人。
1.2 実行環境
- iOS 16 以降/Swift 5.9 以降/Xcode 15 以降
- すべてPlaygroundでも簡易動作、SwiftUI プロジェクトでもそのまま利用可能。
1.3 Combineはいつ使う?Swift Concurrencyとの役割分担
- Swift Concurrency(async/await)は1回の非同期タスクに強い。例: 単発のAPI呼び出し。
- Combineは連続するイベントのストリームに強い。例: テキスト入力・通知・タイマー・ストリームAPI。
- どちらも併用可能。Publisherを
AsyncSequence
として扱え、逆にPassthroughSubject
でAsyncStream
相当も作れる。
2. メンタルモデル:データが「流れる」
2.1 Publisher/Subscriber/Operator/Subscription
- Publisher: 値または完了/失敗イベントを流す側
- Subscriber: 値を受け取る側(
sink
やassign
が代表) - Operator: 中間で値を変換・間引き・結合する演算子(
map
/filter
/combineLatest
など) - Subscription: 購読の実体。
AnyCancellable
を保持しないと途中で解除される。
2.2 Demand(バックプレッシャー)の直感
- Subscriberは「どれくらいの値を要求するか」を伝えられる(Demand)。
- 初学者はまず**“無限に受け取る”**(
sink
既定)と覚え、必要になったら調整・buffer
等を学べばOK。
2.3 “値→演算子→購読” のパイプライン図
[Publisher] --map--> --filter--> [Subscriber] // 左から右にパイプで値が流れるイメージ。
3. 最小のパイプラインから始める
3.1 Just
+ sink
(とにかく動かす)
import Combine
let cancellable = Just("Hello, Combine!")
.sink { value in
print(value) // => Hello, Combine!が流れる
}
// Point: Justは1回だけ値を出して完了するPublisher。
3.2 assign
と @Published
(UIプロパティへの反映)
import Combine
import SwiftUI
final class Counter: ObservableObject {
@Published var text: String = ""
private var bag = Set<AnyCancellable>()
func start() {
["1","2","3"].publisher
.map { "Count: \($0)" }
.sink { [weak self] in
self?.text = $0
}
.store(in: &bag)
}
}
let counter = Counter()
counter.start()
sink(receiveValue:)
で@Published
プロパティに代入するのがシンプル。.assign(to:on:)
もあるが、SwiftUIではsink
代入が直感的でデバッグしやすい。
3.3 ライフサイクル:AnyCancellable
と store(in:)
sink
やassign
は購読を返す。これを保持しないと即キャンセルされる。Set<AnyCancellable>
にstore(in:)
でまとめて保持するのが定石。
4. 主要Publisher早見
4.1 値をその場で出す/遅延/一度だけ
- Just: 即値→完了
- Deferred: 購読が始まった瞬間に生成したいとき
- Future: 一度だけ非同期で結果を返す
let future = Future<Int, Error> { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + 0.3) {
promise(.success(42))
}
}
4.2 イベントを流すSubject
- PassthroughSubject: 現在値を持たない“通過点”。
- CurrentValueSubject: 最新値を保持し、購読時に直ちに流す。
let subject = PassthroughSubject<String, Never>()
let current = CurrentValueSubject<Int, Never>(0)
4.3 OS連携
- Timer.publish: 一定間隔で値。
- NotificationCenter.Publisher: システム通知をPublisher化。
- URLSession.dataTaskPublisher: ネットワークレスポンス。
5. よく使うOperatorを直感で
5.1 変換:map
/ compactMap
/ tryMap
["1","x","2"].publisher
.compactMap(Int.init) // x は捨てる
.map { $0 * 2 } // 2,4
.sink { print($0) }
5.2 フィルタ:filter
/ removeDuplicates
/ prefix
/ drop
[1,2,2,3,4,4,5].publisher
.removeDuplicates()
.filter { $0.isMultiple(of: 2) }
.sink { print($0) } // 2,4
5.3 結合:merge
/ zip
/ combineLatest
import Combine
import SwiftUI
let a = [1,3,5].publisher
let b = [2,4,6].publisher
a.merge(with: b) // 到着順に混ざる
.sink { print($0) }
Publishers.Zip(a, b) // ペアで出る: (1,2),(3,4),(5,6)
.sink { print($0) }
Publishers.CombineLatest(a, b) // どちらかが更新される度に最新ペア
.sink { print($0) }
5.4 非同期の入れ子:flatMap
/ switchToLatest
import Combine
import SwiftUI
let querySubject = PassthroughSubject<String, Never>()
// ここでは、検索クエリを流す
querySubject
.debounce(for: .milliseconds(250), scheduler: DispatchQueue.main)
.removeDuplicates()
.map { query in apiPublisher(for: query) } // Publisher<Results, Error>
.switchToLatest() // 直近の検索だけを採用
.sink(receiveCompletion: { print($0) }, receiveValue: { print($0) })
// ここでは、APIを模したPublisherを返す
func apiPublisher(for q: String) -> AnyPublisher<[String], Error> {
Just(["\(q)1", "\(q)2"]).setFailureType(to: Error.self)
.delay(for: .milliseconds(Int.random(in: 100...400)), scheduler: DispatchQueue.global())
.eraseToAnyPublisher()
}
5.5 時間系:debounce
/ throttle
/ delay
- debounce: 入力が止まってから発火(タイプアヘッド向き)
- throttle: 一定間隔より高頻度なイベントを間引く
- delay: 配信を遅らせる
5.6 蓄積:scan
/ reduce
import Combine
import SwiftUI
[1,2,3,4].publisher
.scan(0, +) // 1,3,6,10(部分和)
.sink { print($0) }
5.7 エラー処理:mapError
/ catch
/ retry
import Combine
import SwiftUI
// Failure == Never は失敗しないPublisher(例: Just)。
failablePublisher()
.retry(2)
.mapError { MyError.network($0) }
.catch { _ in
Just("fallback").setFailureType(to: MyError.self)
}
.sink(receiveCompletion: { print($0) }, receiveValue: { print($0) })
5.8 各オペレータの役割まとめ
オペレーター | 役割 | 実務での使い所 | ポイント |
---|
map | 値の変換 | モデル→ViewModel変換、Int→Stringなど | 単純な値変換に。副作用を入れない設計が好ましい。 |
compactMap | nilを除外した変換 | String→Int(失敗する可能性あり)など | nil 除外したい場合はこちら。 |
tryMap | throw 可能な変換 | ネットワークレスポンスのデコード | map + エラーハンドリングしたいとき。 |
filter | 条件に合う値だけ通す | 入力制限、特定の状態だけ通す | パスワード8文字以上、などの条件に。 |
removeDuplicates | 同じ値の重複を除く | テキスト入力の連打対策 | Equatable 必須。$text と併用多い。 |
debounce | イベント後の静止を待つ | 検索フォームのタイプアヘッド | DispatchQueue.main などのScheduler指定が必要。 |
throttle | 一定時間に1回だけ通す | スクロール連打防止など | latest: で直近/先頭どちらを通すか選べる。 |
delay | 指定時間遅らせて発火 | アニメーションやUI表示タイミングの調整 | 完全な非同期化ではない。 |
merge | 複数のPublisherを混ぜる | 複数のイベントを1つにまとめたいとき | 到着順でそのまま流れる。 |
zip | 両方のPublisherから1つずつ出す | 並列リクエスト→両方完了してからUI更新 | ペアにしたい場面。ストリームが異なる長さだと片方で止まる。 |
combineLatest | どちらか更新時に最新ペア出す | 入力フォームの全体バリデーション | すでに値がある状態でないと出力されない。 |
flatMap | 入れ子Publisherを展開 | 各イベントで非同期APIを叩くなど | 多重購読になるため注意(例:連打対応にはswitchToLatest 推奨)。 |
switchToLatest | 直近のPublisherだけを採用 | 検索フォーム→API連打対策 | 中断性のあるストリーム処理に必須。 |
catch | エラー時に代替Publisher返す | ネットワーク失敗→ローカルキャッシュで代替 | tryMap や flatMap と併用しやすい。 |
retry(_:) | エラー時に再試行する | 一時的な通信エラーのリトライ | Failure がある型に限る。 |
scan | 値を累積して出す | スコア加算、ヒストリ作成など | 全体ではなく“都度”結果を出す。 |
reduce | 値を累積して1つにまとめる | 最終的な合計や集計を1回だけ出す | collect() と併用して使うケースが多い。 |
assign(to: &$var) | 値を直接プロパティに代入 | @Published や@State 変数へ代入 | SwiftUI連携では最短で済む書き方。 |
6. スレッドとスケジューラ
6.1 subscribe(on:)
と receive(on:)
の違い
subscribe(on:)
: 上流の仕事(ネットワーク/重い処理)をどのスレッドで開始するか。receive(on:)
: 以降の下流が受け取るスレッドを切り替える(UI更新はメイン)。
import Combine
import SwiftUI
URLSession.shared.dataTaskPublisher(for: URL(string: "https://example.com")!)
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.map(\.(data))
.decode(type: Response.self, decoder: JSONDecoder())
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { _ in }, receiveValue: { [weak self] value in
self?.result = value
})
.store(in: &bag)
6.2 スケジューラの選び分け
- DispatchQueue: 汎用的。メイン/UIもここ。
- RunLoop: タイマーやUIイベントに近い。
Timer.publish
と好相性。 - OperationQueue: 依存関係を組みたい処理。
6.3 UIスレッド保証と@MainActor
- ViewModelのUI反映メソッドを@MainActorにしておくと安全。
7. SwiftUIとの連携パターン
7.1 ObservableObject
と @Published
/@StateObject
/@ObservedObject
import Combine
import SwiftUI
final class SearchViewModel: ObservableObject {
@Published var query = ""
@Published private(set) var results: [String] = []
private var bag = Set<AnyCancellable>()
init() {
bind()
}
private func bind() {
$query
.debounce(for: .milliseconds(250), scheduler: DispatchQueue.main)
.removeDuplicates()
.flatMap { Self.search($0).replaceError(with: []) }
.receive(on: DispatchQueue.main)
.assign(to: &$results) // 代入専用の短縮構文(Xcodeの補完で出ない場合はsink代入でOK)
}
static func search(_ q: String) -> AnyPublisher<[String], Error> {
Just(["\(q)A", "\(q)B"]).setFailureType(to: Error.self)
.delay(for: .milliseconds(200), scheduler: DispatchQueue.global())
.eraseToAnyPublisher()
}
}
struct SearchView: View {
@StateObject private var vm = SearchViewModel()
var body: some View {
VStack {
TextField("Search", text: $vm.query)
.textFieldStyle(.roundedBorder)
List(vm.results, id: \.) { Text($0) }
}
.padding()
}
}
7.2 .onReceive
でシンプルに取り込む
import Combine
import SwiftUI
struct ClockView: View {
@State private var now = Date()
private let timer = Timer.publish(every: 1, on: .main, in: .common).autoconnect()
var body: some View {
Text(now.formatted())
.onReceive(timer) { now = $0 }
}
}
7.3 assign(to: &$property)
と assign(to:on:)
assign(to: &$property)
: @Published
/@State
などプロパティへの直代入に便利。assign(to:on:)
: 既存クラスのプロパティにKeyPathで代入。
7.4 Viewのライフサイクルとキャンセル戦略
@StateObject
でViewライフサイクルに沿って購読を保持。onDisappear
でbag.removeAll()
など明示キャンセルも状況次第で有効。
8. 直感的にわかる実践サンプル(完成コード付き)
8.1 タイプアヘッド検索:debounce
+ removeDuplicates
+ switchToLatest
import Combine
import SwiftUI
struct TypeaheadView: View {
@StateObject private var vm = TypeaheadViewModel()
var body: some View {
VStack(alignment: .leading, spacing: 12) {
TextField("Search repositories...", text: $vm.query)
.textFieldStyle(.roundedBorder)
Group {
if vm.isLoading { ProgressView() }
if let error = vm.error { Text(error).foregroundStyle(.red) }
}
List(vm.results, id: \.self) { Text($0) }
}
.padding()
}
}
final class TypeaheadViewModel: ObservableObject {
@Published var query = ""
@Published private(set) var results: [String] = []
@Published private(set) var isLoading = false
@Published private(set) var error: String? = nil
private var bag = Set<AnyCancellable>()
init() {
$query
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
.removeDuplicates()
.map { [unowned self] q -> AnyPublisher<[String], Never> in
guard !q.isEmpty else { return Just([]).eraseToAnyPublisher() }
self.isLoading = true
self.error = nil
return Self.searchAPI(q)
.handleEvents(receiveCompletion: { _ in self.isLoading = false })
.catch { [weak self] err -> Just<[String]> in
self?.error = err.localizedDescription
return Just([])
}
.eraseToAnyPublisher()
}
.switchToLatest()
.receive(on: DispatchQueue.main)
.assign(to: &$results)
}
private static func searchAPI(_ q: String) -> AnyPublisher<[String], Error> {
// デモ用ダミー(本番はURLSession.dataTaskPublisherでOK)
Just(["\(q) Kit", "\(q) Tools", "Awesome \(q)"])
.tryMap { arr -> [String] in
if Bool.random() { return arr } else { throw URLError(.badServerResponse) }
}
.delay(for: .milliseconds(400), scheduler: DispatchQueue.global())
.eraseToAnyPublisher()
}
}
8.2 フォームバリデーション:combineLatest
import Combine
import SwiftUI
final class SignUpViewModel: ObservableObject {
@Published var email = ""
@Published var password = ""
@Published private(set) var isValid = false
private var bag = Set<AnyCancellable>()
init() {
Publishers.CombineLatest($email, $password)
.map { email, pass in
email.contains("@") && pass.count >= 8
}
.removeDuplicates()
.assign(to: &$isValid)
}
}
struct SignUpView: View {
@StateObject private var vm = SignUpViewModel()
var body: some View {
Form {
TextField("Email", text: $vm.email)
.keyboardType(.emailAddress)
SecureField("Password (8+)", text: $vm.password)
Button("Create Account") { /* submit */ }
.disabled(!vm.isValid)
}
}
}
8.3 タイマーUI:Timer.publish
+ autoconnect
import Combine
import SwiftUI
struct PomodoroView: View {
@State private var seconds = 25 * 60
private let tick = Timer.publish(every: 1, on: .main, in: .common).autoconnect()
var body: some View {
Text("\(seconds / 60):\(String(format: "%02d", seconds % 60))")
.font(.system(.largeTitle, design: .rounded))
.onReceive(tick) { _ in
if seconds > 0 { seconds -= 1 }
}
}
}
8.4 通知の受信:キーボード表示高さをViewに反映
import Combine
import SwiftUI
final class Keyboard: ObservableObject {
@Published private(set) var height: CGFloat = 0
private var bag = Set<AnyCancellable>()
init() {
NotificationCenter.default.publisher(for: UIApplication.keyboardWillChangeFrameNotification)
.compactMap { $0.userInfo?[UIResponder.keyboardFrameEndUserInfoKey] as? CGRect }
.map { $0.height }
.receive(on: DispatchQueue.main)
.assign(to: &$height)
}
}
struct ChatInputView: View {
@StateObject private var kb = Keyboard()
@State private var text = ""
var body: some View {
VStack {
Spacer()
TextField("Message...", text: $text)
.textFieldStyle(.roundedBorder)
.padding(.bottom, kb.height)
.animation(.easeOut(duration: 0.25), value: kb.height)
}
.padding()
}
}
8.5 KVOブリッジ:@objc dynamic
+ KeyPath Publisher
import Combine
import SwiftUI
class Player: NSObject { // NSObjectが必要
@objc dynamic var progress: Double = 0
}
let player = Player()
var bag = Set<AnyCancellable>()
player.publisher(for: \.progress)
.sink { print("progress:", $0) }
.store(in: &bag)
// どこかで
player.progress = 0.5 // => progress: 0.5
9. Swift Concurrencyブリッジ(async/awaitと合わせ技)
9.1 Publisherを AsyncSequence
として扱う:.values
import Combine
import SwiftUI
func consumeAsyncSequence<P: Publisher>(_ p: P) async where P.Failure == Never {
for await value in p.values { // 非同期for-awaitで取り出す
print(value)
}
}
9.2 Task
と onReceive
の選び分け
- UIに素直に反映したい:
.onReceive
やassign
。 - 複雑な非同期処理を行いたい:
Task
内でfor await
やawait
を使う。
9.3 PassthroughSubject
で AsyncStream
相当
import Combine
import SwiftUI
let subject = PassthroughSubject<Int, Never>()
func makeAsyncStream() -> AsyncStream<Int> {
AsyncStream { continuation in
let c = subject.sink { value in
continuation.yield(value)
}
continuation.onTermination = { _ in c.cancel() }
}
}
10. メモリ管理 & トラブル回避
10.1 [weak self]
の指針と「イベントが来ない」の典型原因
- クロージャ内で
self
を参照すると強参照循環になり得る→[weak self]
を基本に。 - AnyCancellableを保持していないと即キャンセル→
bag
を用意しstore(in:)
。
10.2 予期せぬ多重購読/リークを防ぐチェックリスト
init
/onAppear
でbind()
→重複実行していないかshare()
やmulticast
でUpstreamの重複実行を防ぐ
10.3 share()
/ multicast
で無駄再実行回避
import Combine
import SwiftUI
let shared = URLSession.shared.dataTaskPublisher(for: url)
.map(\.(data))
.share()
shared
.sink { _ in } receiveValue: { print("A:", $0.count) }
.store(in: &bag)
shared
.sink { _ in } receiveValue: { print("B:", $0.count) }
.store(in: &bag)
11. テスト戦略
11.1 XCTestでの非同期検証(XCTestExpectation
+ collect
)
import XCTest
import Combine
final class SignUpViewModelTests: XCTestCase {
var bag: Set<AnyCancellable>!
override func setUp() {
bag = []
}
func testValidation() {
let vm = SignUpViewModel()
let exp = expectation(description: "valid emits true")
vm.$isValid.dropFirst().sink { isValid in
if isValid {
exp.fulfill()
}
}
.store(in: &bag)
vm.email = "a@b.com"
vm.password = "12345678"
wait(for: [exp], timeout: 1.0)
}
}
11.2 スケジューラ制御(仮想時間の考え方)
- Combine標準には仮想時間スケジューラはない。
debounce
等のテストは、依存をScheduler
プロトコルで渡せる設計にしてテスト用スケジューラを差し替える設計が有効(外部ライブラリ活用も選択肢)。
11.3 ViewModelの入力・出力を観測するテスト設計
- 入力:
@Published
に値を流す - 出力:
@Published
やPassthroughSubject
をcollect
/sink
で観測 - 副作用は
handleEvents
でフックして検証もしやすい
12. まとめ
12.1 学び直し用のKeyポイント
- 「値が流れるパイプ」という一本のイメージで理解し直す。
- まずは
sink
で受ける→map
/filter
→flatMap
/switchToLatest
→スケジューラ→エラー処理→共有の順に層を重ねる。
12.2 Next Step:Combineを捨てずにConcurrencyと共存
- 単発処理:async/await。
- 継続イベント:Combine。
- 両者をブリッジできると設計の自由度が上がる。
参考リンク