ぷらすのブログ

レイヤードアーキテクチャを採用した際のWebSocket実装例

#開発#GitHub#Go#WebSocket

こんにちは、@p1assです。

先日、レイヤーアーキテクチャを採用している Web API サーバに WebSocket を組み込むことになったのですが、コネクションの管理やどのレイヤーで各機能を管理するか悩んだのでブログにまとめておきます。

使用している言語は Go で、Web フレームワークは echo です。

WebSocket 実装前の API サーバの構成

WebSocket を実装する前の API サーバのディレクトリ構成は次のようになっていました。(ブログ用に調整を加えてます。)

.
├── database # repository のインターフェースを満たす実体
├── domain
│ ├── entity
│ └── repository
│ └── service
├── main.go
├── usecase
└── web
├── handler
└── router.go

処理の流れは以下の通りです。

web/router.go
↓
web/handler/_.go
↓
usecase/_.go
↓
domain/\*

レイヤードアーキテクチャを採用している無難なパッケージ構成になっています。明確に DDD や Clean Architecture であるとは言えないですが、その思想を取り入れつつ独自にカスタマイズしています。[^1]

[^1] ここではこのアーキテクチャの良し悪しについては語りません。話が逸れすぎるので。

WebSocket の機能要件

今回の WebSocket の要件は以下の通りです。

メッセージのやり取りはサーバからクライアントへの一方向のみ、加えてプロセスも 1 個なのでそこまで複雑にはならない想定です。

設計する上で考えたこと

さて、この API サーバに WebSocket の通信を実装していくわけですが、まずは設計をしていきます。

レイヤーを用いた責務の分離

レイヤードアーキテクチャを利用している以上当たり前ですが、責務を分離して見通しやすい実装を目指します。

gorutine リーク、メモリリーク

通常の HTTP リクエストでは、1 リクエストごとに一つの gorutine が生成されレスポンスを返したら gorotuine が終了します。

しかし、WebSocket のコネクションを扱う場合は接続している間は goroutine が生きたままになります。コネクション切断されたときに正しく goroutine の終了処理をしないと、段々使われなくなった goroutine が溜まっていきメモリを圧迫してしまいます。

スレッドセーフ

WebSocket コネクションの作成/削除時に一つの map や slice に複数の goroutine からアクセスされるため、スレッドセーフになるように実装する必要があります。

また、channel を使う場合は適切にサイズを指定してデッドロックしないように気をつける必要があります。

実装

上記を気をつけつつ実装を行います。

インターフェースの定義

まずは、メッセージの送信です。ドメインロジックの処理結果などを通知するのに使われるため、インターフェースは domain package で宣言します。

type Pusher interface {
  Push(pushMsg *PushMessage) error
}

type PushMessage struct {
  RoomID string
  Event *entity.Event
}
type Event struct {
  Type string `json:"type`
  Content string `json:"content"`
}

使う側はこんな感じで呼び出します。

type RoomUseCase struct{
  repo repository.Room
  pusher event.Pusher
  // ...
}

func (uc *RoomUseCase) Join(roomID string, user *entity.User) error {
  room, err := uc.repo.GetByID(roomID)
  if err != nil {}

  if !room.CanJoin(user) {
    return errors.New("cannot join to room")
  }

  room.Join(user)
  if err := uc.repo.save(room); err != nil {}

  e := &entity.Event{
    Type: "JOIN",
    Content: "hogehoge",
  }
  if err := uc.pusher.Push(roomID, e); err != nil {}

  return nil
}

ユースケースでは WebSocket の詳細には関与せず、ただメッセージを送信するだけに留めます。

Pusher インターフェースの実装

次に Pusher インターフェースを満たす構造体を作ります。この構造体は複数の WebSocket コネクションを一括してハンドリングし、適切なコネクションに対してメッセージを送信します。ソースコードは web/ws/*.go に配置します。web パッケージ内におくかどうかは悩んだのですが、HTTP 上のプロトコルなのでここにしました。

WebSocket を扱うライブラリは gorrila/websocket を採用しています。README にも書かれている通り、準標準の golang.org/x/net/websocket は機能が不足しています。GoDocには代替案として gorrila/websocket が書いてあるので今回はこちらを採用しました。

実装はgorrila/websocket の exampleを非常に参考にさせていただきました。

まず、WebSocket のコネクションをラップする構造体を作成します。

type Client struct {
  roomID      string
  conn             *websocket.Conn
  pushCh         chan *entity.Event
  notifyClosedCh chan<- *Client // HubのunregisterChをもらう
}

この構造体は一つの WebSocket コネクションと一対一で対応します。そして、この Client 1 つごとに 1 つの goroutine を起動し、メッセージを送信するループを実行します。

ループがエラーで終了したときはコネクションを閉じて後述する Hub に対して通知します。

func (c *Client) PushLoop() {
  ticker := time.NewTicker(pingPeriod)
  defer func() {
    ticker.Stop()
    c.notifyClosedCh <- c
    c.conn.Close()
  }()

  for {
    select {
    case msg, ok := <-c.pushCh:
      c.ws.SetWriteDeadline(time.Now().Add(writeWait))
      if !ok {
        if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
          fmt.Printf("failed to write close message: %v\n", err)
          return
        }
      }

      if err := c.conn.WriteJSON(msg); err != nil {
        fmt.Printf("failed to WriteJSON: %v\n", err)
        return
      }
    case <-ticker.C:
      c.ws.SetWriteDeadline(time.Now().Add(writeWait))
      if err := c.conns.WriteMessage(websocket.PingMessage, nil); err != nil {
        fmt.Printf("failed to ping: %v\n", err)
        return
      }
    }
  }
}

次に、すべての Client を管理する Hub 構造体を作成します。

type Hub struct {
  // 1つ目のキーがルームID
  // O(1) で Client を削除できるようにmapでClientを持つ
  clientsPerRoom map[string]map[*Client]struct{}
  pushMsgCh         chan *event.PushMessage
  registerCh        chan *Client
  unregisterCh      chan *Client
}

コメントにもある通り、コネクションが切断されたときに Client を削除しやすいように map で Client を持ちます。map をスレッドセーフに扱えるように clientsPerRoom へのアクセスはすべて Run() のループから行うようにします。

func (h *Hub) Register(client *Client) {
  h.registerCh <- client
}
func (h *Hub) Unregister(client *Client) {
  h.unregisterCh <- client
}

func (h *Hub) Run() {
  for {
    select {
    case cli := <-h.registerCh:
      h.register(cli)
    case cli := <-h.unregisterCh:
      h.unregister(cli)
    case pushMsg := <-h.pushMsgCh:
      h.push(pushMsg)
    }
  }
}

func (h *Hub) register(cli *Client) {
  roomID := cli.roomID
  if _, ok := h.clientsPerRoom[roomID]; ok {
    h.clientsPerRoom[roomID][cli] = struct{}{}
    return
  }
  h.clientsPerRoom[roomID] = map[*Client]struct{}{cli: {}}
}

func (h *Hub) unregister(cli *Client) {
  roomID := cli.roomID
  if _, ok := h.clientsPerRoom[roomID][cli]; ok {
    delete(h.clientsPerRoom[roomID], cli)
  }
}

次に、Pusher インターフェースを満足するメソッドを生やします。

func (h *Hub) Push(pushMsg *event.PushMessage) {
  h.pushMsgCh <- pushMsg
}

func (h *Hub) push(pushMsg *event.PushMessage) {
  for cli := range h.clientsPerRoom[pushMsg.RoomID] {
    cli.pushCh <- pushMsg.Msg
  }
}

Push() は channel を通じてメッセージを送り、Run() で受け取ります。その後、各 Client で動いている goroutine へ channel を通じてメッセージを送ります。

Hub 側ではなく Client 側の gorutine で WebSocket コネクションへの書き込みを行っているのはデッドロックを回避するためです。

もし、WebSocket コネクションへの書き込みが失敗した場合、unregisterCh を通じて Client の登録解除を試みます。しかし、同じ goroutine で書き込みを行っている場合、HubRun() で動いているメインループは h.push(pushMsg) でブロックされており、case cli := <-h.unregisterCh で登録解除のメッセージを受け取ることはできません。

channel のバッファーのサイズを指定することでブロックせずキューに登録解除の通知を溜めることもできますが、キューが溢れたらデッドロックしてしまうことには変わりないです。いつ発生するか分からない恐怖に耐えるよりかは Client 側の別 goroutine に処理を移譲した方が安心できます。

最後に、ハンドラーの処理と Hub のループを goroutine で実行する処理を書きます。

func (h *WebSocketHandler) WebSocket(c echo.Context) error {
  roomID := c.Param("id")

  wsConn, err := h.upgrader.Upgrade(c.Response(), c.Request(), nil)
  if err != nil {
    c.Logger().Errorf("upgrader.Upgrade: %v", err)
    return echo.NewHTTPError(http.StatusInternalServerError)
  }

  wsCli := ws.NewClient(roomID, wsConn, h.hub.UnregisterCh())
  h.hub.Register(wsCli)

  go wsCli.PushLoop()
  return nil
}
func main() {
  // ...
  hub := ws.NewHub()
  go hub.Run()
  // ...
}

この実装の辛み

これで実装は出来ているわけですが、ベストではないと考えています。

特に、Client の終了を Hub に伝える処理が微妙です。現在は HubunregisterChClient 側に渡していますが、Hub が管理している channel を送信可能状態(chan<- T)で外部公開するのに抵抗があります。

送信可能な channel(chan<- T)を外部に公開すると、間違って channel を closeされる可能性があります。外部に公開するのは受信専用の channel(<-chan T)にしておきたいところですが、そうすると Client 側で生成された n 個分の channel を別途 Hub 側で管理する必要がありしんどいなぁと思っています。

良さげなアイデアがあれば Twitter なりブコメなりで教えてくれると助かります。

感想

← Docker Contextsを使ってDocker Composeをデプロイする際の注意点WebサーバをNginxから証明書自動更新に対応したCaddy 2に移行した →
Topへ戻る