こんにちは、@p1assです。
先日、レイヤーアーキテクチャを採用している Web API サーバに WebSocket を組み込むことになったのですが、コネクションの管理やどのレイヤーで各機能を管理するか悩んだのでブログにまとめておきます。
使用している言語は Go で、Web フレームワークは echo です。
WebSocket 実装前の API サーバの構成
WebSocket を実装する前の API サーバのディレクトリ構成は次のようになっていました。(ブログ用に調整を加えてます。)
.
├── database # repository のインターフェースを満たす実体
├── domain
│ ├── entity
│ └── repository
│ └── service
├── main.go
├── usecase
└── web
├── handler
└── router.go
処理の流れは以下の通りです。
web/router.go
↓
web/handler/_.go
↓
usecase/_.go
↓
domain/\*
レイヤードアーキテクチャを採用している無難なパッケージ構成になっています。明確に DDD や Clean Architecture であるとは言えないですが、その思想を取り入れつつ独自にカスタマイズしています。[^1]
[^1] ここではこのアーキテクチャの良し悪しについては語りません。話が逸れすぎるので。
WebSocket の機能要件
今回の WebSocket の要件は以下の通りです。
- サーバからクライアントへのイベント通知 (JSON 形式)
- クライアントからサーバへの送信は行わない
- クライアントは複数のセグメントに分かれている (便宜上「ルーム」と呼ぶ)
- サーバからのイベント通知はルームごとに行う
- 現時点ではサーバプロセスは 1 つだが、将来的にスケールアウトすることを考慮する
- Redis PubSub などを使ってイベントの同期をする必要はないが、実装しやすいようにしておく
メッセージのやり取りはサーバからクライアントへの一方向のみ、加えてプロセスも 1 個なのでそこまで複雑にはならない想定です。
設計する上で考えたこと
さて、この API サーバに WebSocket の通信を実装していくわけですが、まずは設計をしていきます。
レイヤーを用いた責務の分離
レイヤードアーキテクチャを利用している以上当たり前ですが、責務を分離して見通しやすい実装を目指します。
gorutine リーク、メモリリーク
通常の HTTP リクエストでは、1 リクエストごとに一つの gorutine が生成されレスポンスを返したら gorotuine が終了します。
しかし、WebSocket のコネクションを扱う場合は接続している間は goroutine が生きたままになります。コネクション切断されたときに正しく goroutine の終了処理をしないと、段々使われなくなった goroutine が溜まっていきメモリを圧迫してしまいます。
スレッドセーフ
WebSocket コネクションの作成/削除時に一つの map や slice に複数の goroutine からアクセスされるため、スレッドセーフになるように実装する必要があります。
また、channel を使う場合は適切にサイズを指定してデッドロックしないように気をつける必要があります。
実装
上記を気をつけつつ実装を行います。
インターフェースの定義
まずは、メッセージの送信です。ドメインロジックの処理結果などを通知するのに使われるため、インターフェースは domain package で宣言します。
type Pusher interface {
Push(pushMsg *PushMessage) error
}
type PushMessage struct {
RoomID string
Event *entity.Event
}
type Event struct {
Type string `json:"type`
Content string `json:"content"`
}
使う側はこんな感じで呼び出します。
type RoomUseCase struct{
repo repository.Room
pusher event.Pusher
// ...
}
func (uc *RoomUseCase) Join(roomID string, user *entity.User) error {
room, err := uc.repo.GetByID(roomID)
if err != nil {}
if !room.CanJoin(user) {
return errors.New("cannot join to room")
}
room.Join(user)
if err := uc.repo.save(room); err != nil {}
e := &entity.Event{
Type: "JOIN",
Content: "hogehoge",
}
if err := uc.pusher.Push(roomID, e); err != nil {}
return nil
}
ユースケースでは WebSocket の詳細には関与せず、ただメッセージを送信するだけに留めます。
Pusher インターフェースの実装
次に Pusher インターフェースを満たす構造体を作ります。この構造体は複数の WebSocket コネクションを一括してハンドリングし、適切なコネクションに対してメッセージを送信します。ソースコードは web/ws/*.go
に配置します。web パッケージ内におくかどうかは悩んだのですが、HTTP 上のプロトコルなのでここにしました。
WebSocket を扱うライブラリは gorrila/websocket を採用しています。README にも書かれている通り、準標準の golang.org/x/net/websocket は機能が不足しています。GoDocには代替案として gorrila/websocket が書いてあるので今回はこちらを採用しました。
実装はgorrila/websocket の exampleを非常に参考にさせていただきました。
まず、WebSocket のコネクションをラップする構造体を作成します。
type Client struct {
roomID string
conn *websocket.Conn
pushCh chan *entity.Event
notifyClosedCh chan<- *Client // HubのunregisterChをもらう
}
この構造体は一つの WebSocket コネクションと一対一で対応します。そして、この Client
1 つごとに 1 つの goroutine を起動し、メッセージを送信するループを実行します。
ループがエラーで終了したときはコネクションを閉じて後述する Hub に対して通知します。
func (c *Client) PushLoop() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.notifyClosedCh <- c
c.conn.Close()
}()
for {
select {
case msg, ok := <-c.pushCh:
c.ws.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
fmt.Printf("failed to write close message: %v\n", err)
return
}
}
if err := c.conn.WriteJSON(msg); err != nil {
fmt.Printf("failed to WriteJSON: %v\n", err)
return
}
case <-ticker.C:
c.ws.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conns.WriteMessage(websocket.PingMessage, nil); err != nil {
fmt.Printf("failed to ping: %v\n", err)
return
}
}
}
}
次に、すべての Client
を管理する Hub
構造体を作成します。
type Hub struct {
// 1つ目のキーがルームID
// O(1) で Client を削除できるようにmapでClientを持つ
clientsPerRoom map[string]map[*Client]struct{}
pushMsgCh chan *event.PushMessage
registerCh chan *Client
unregisterCh chan *Client
}
コメントにもある通り、コネクションが切断されたときに Client
を削除しやすいように map で Client
を持ちます。map をスレッドセーフに扱えるように clientsPerRoom
へのアクセスはすべて Run()
のループから行うようにします。
func (h *Hub) Register(client *Client) {
h.registerCh <- client
}
func (h *Hub) Unregister(client *Client) {
h.unregisterCh <- client
}
func (h *Hub) Run() {
for {
select {
case cli := <-h.registerCh:
h.register(cli)
case cli := <-h.unregisterCh:
h.unregister(cli)
case pushMsg := <-h.pushMsgCh:
h.push(pushMsg)
}
}
}
func (h *Hub) register(cli *Client) {
roomID := cli.roomID
if _, ok := h.clientsPerRoom[roomID]; ok {
h.clientsPerRoom[roomID][cli] = struct{}{}
return
}
h.clientsPerRoom[roomID] = map[*Client]struct{}{cli: {}}
}
func (h *Hub) unregister(cli *Client) {
roomID := cli.roomID
if _, ok := h.clientsPerRoom[roomID][cli]; ok {
delete(h.clientsPerRoom[roomID], cli)
}
}
次に、Pusher
インターフェースを満足するメソッドを生やします。
func (h *Hub) Push(pushMsg *event.PushMessage) {
h.pushMsgCh <- pushMsg
}
func (h *Hub) push(pushMsg *event.PushMessage) {
for cli := range h.clientsPerRoom[pushMsg.RoomID] {
cli.pushCh <- pushMsg.Msg
}
}
Push()
は channel を通じてメッセージを送り、Run()
で受け取ります。その後、各 Client
で動いている goroutine へ channel を通じてメッセージを送ります。
Hub
側ではなく Client
側の gorutine で WebSocket コネクションへの書き込みを行っているのはデッドロックを回避するためです。
もし、WebSocket コネクションへの書き込みが失敗した場合、unregisterCh
を通じて Client
の登録解除を試みます。しかし、同じ goroutine で書き込みを行っている場合、Hub
の Run()
で動いているメインループは h.push(pushMsg)
でブロックされており、case cli := <-h.unregisterCh
で登録解除のメッセージを受け取ることはできません。
channel のバッファーのサイズを指定することでブロックせずキューに登録解除の通知を溜めることもできますが、キューが溢れたらデッドロックしてしまうことには変わりないです。いつ発生するか分からない恐怖に耐えるよりかは Client
側の別 goroutine に処理を移譲した方が安心できます。
最後に、ハンドラーの処理と Hub
のループを goroutine で実行する処理を書きます。
func (h *WebSocketHandler) WebSocket(c echo.Context) error {
roomID := c.Param("id")
wsConn, err := h.upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
c.Logger().Errorf("upgrader.Upgrade: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError)
}
wsCli := ws.NewClient(roomID, wsConn, h.hub.UnregisterCh())
h.hub.Register(wsCli)
go wsCli.PushLoop()
return nil
}
func main() {
// ...
hub := ws.NewHub()
go hub.Run()
// ...
}
この実装の辛み
これで実装は出来ているわけですが、ベストではないと考えています。
特に、Client
の終了を Hub
に伝える処理が微妙です。現在は Hub
の unregisterCh
を Client
側に渡していますが、Hub
が管理している channel を送信可能状態(chan<- T
)で外部公開するのに抵抗があります。
送信可能な channel(chan<- T
)を外部に公開すると、間違って channel を close
される可能性があります。外部に公開するのは受信専用の channel(<-chan T
)にしておきたいところですが、そうすると Client
側で生成された n 個分の channel を別途 Hub
側で管理する必要がありしんどいなぁと思っています。
良さげなアイデアがあれば Twitter なりブコメなりで教えてくれると助かります。
感想
- レイヤードアーキテクチャだから面倒くさいと感じる部分は結構少なかった。
- Reids PubSub を使うなら、
Push
で Publish して、Hub
側で Subscribe すれば問題なく実装できそう。 - Go には goroutine や channel といった便利な機能があるが、ソフトウェア・エンジニアリング的に「正しく使う」のは意外と難しいなと感じた。