コマンドディスパッチ

ディスパッチシステムはプロセスからハンドラにコマンドをルーティングします。プロセスは相関タグ付きでコマンドをyieldし、ハンドラが非同期作業を実行し、結果がイベントキュー経由で戻ります。

フロー

sequenceDiagram
    participant P as プロセス
    participant W as ワーカー
    participant R as レジストリ
    participant H as ハンドラ

    P->>W: yield(command, tag)
    W->>R: getHandler(cmdID)
    R-->>W: handler
    W->>H: Handle(cmd, tag, receiver)
    H-->>H: 非同期作業
    H->>W: CompleteYield(tag, result)
    W->>P: イベントをキュー、ウェイク
    P->>P: 結果で再開

コマンドレジストリ

レジストリはハイブリッド構造でハンドラを格納:

type Registry struct {
    handlers [256]Handler         // システムコマンド: O(1)インデックス
    extended map[CommandID]Handler // 拡張コマンド: マップルックアップ
    frozen   atomic.Bool          // ブート後はロックフリー
}

システムコマンド(0-255)は配列インデックスを使用。拡張コマンドはマップルックアップを使用。Freeze()後、すべてのルックアップはロックフリー。

コマンドID範囲

範囲 モジュール
1-9 process Send, Spawn, Terminate, Monitor, Link
10-29 clock Sleep, Ticker, Timer
50-59 stream Read, Write, Close, Seek
60-79 http Request, RequestBatch
80-89 websocket Connect, Send, Receive
90-99 event Subscribe, Send
100-119 sql Query, Execute, Transaction ops
120-129 store Get, Set, Delete, Has
130-139 security ValidateToken, CreateToken
140-149 function Call, AsyncStart, AsyncCancel
150-159 exec ProcessWait
160-169 cloudstorage Upload, Download, List, Presigned URLs
170-179 eval Compile, Run, CreateProcess
180-189 workflow SideEffect, Call, Version, UpsertAttrs
190-199 contract Open, Call, AsyncCall, AsyncCancel
256+ custom ユーザー定義サービス

登録はMustRegisterCommands()経由でブート時に行われます。衝突はスタートアップ時にパニック。

コマンドの定義

コマンドは一意のCommandIDを持つデータ構造です:

const MyCommand dispatcher.CommandID = 200

type MyCmd struct {
    Input  string
    Option int
}

var myCmdPool = sync.Pool{New: func() any { return &MyCmd{} }}

func (c *MyCmd) CmdID() dispatcher.CommandID { return MyCommand }

func (c *MyCmd) Release() {
    c.Input = ""
    c.Option = 0
    myCmdPool.Put(c)
}

プール再利用でホットパスでのアロケーションを排除。パッケージinitで登録:

func init() {
    dispatcher.MustRegisterCommands("myservice", MyCommand)
}

ディスパッチャー

ディスパッチャーは関連ハンドラをグループ化します。RegisterAllでハンドラを登録し、セットアップ/ティアダウン用のライフサイクルメソッドを実装:

type Handler interface {
    Handle(ctx context.Context, cmd Command, tag uint64, receiver ResultReceiver) error
}

type ResultReceiver interface {
    CompleteYield(tag uint64, data any, err error)
}
type Dispatcher struct {
    // サービス状態
}

func (d *Dispatcher) RegisterAll(register func(id dispatcher.CommandID, h dispatcher.Handler)) {
    register(myapi.MyCommand, dispatcher.HandlerFunc(d.handleMyCommand))
}

func (d *Dispatcher) handleMyCommand(ctx context.Context, cmd Command, tag uint64, receiver ResultReceiver) error {
    c := cmd.(*myapi.MyCmd)
    go func() {
        result := doWork(c)
        if ctx.Err() == nil {
            receiver.CompleteYield(tag, result, nil)
        }
    }()
    return nil
}

ブートコンポーネントとして登録:

func MyDispatcher() boot.Component {
    return boot.New(boot.P{
        Name:      "dispatcher.myservice",
        DependsOn: []boot.Name{DispatcherName},
        Load: func(ctx context.Context) (context.Context, error) {
            reg := dispatcher.GetRegistrar(ctx)
            svc := myservice.NewDispatcher()
            svc.RegisterAll(reg.Register)
            return ctx, nil
        },
    })
}

Yieldと相関

プロセスが非同期作業を必要とする場合、相関タグ付きでコマンドをyield:

type Yield struct {
    Cmd Command
    Tag uint64    // 相関用のプロセスローカルカウンター
}

ワーカーは各ステップ後にStepOutputからyieldを抽出し、ハンドラにディスパッチします。各タグはリクエストを一意に識別し、結果をマッチバック可能にします。

関連項目