2014年09月03日

チャットで学ぶ Go ネットワークプログラミング

はてなブックマークに登録

簡単なチャットプログラムは、ネットワークプログラミング用のフレームワークでは定番のサンプルプログラムです。
echo サーバーが Hello World とするなら、チャットは FizzBuzz といったところでしょう。

とりあえず動くだけのチャットならだれでもすぐに作れるようになりますが、まじめにチャットを作ることで、 ネットワークプログラミングで考えないといけない点やエラー処理の重要な基礎を学ぶことができます。

ということで、 Go でシンプルなチャットを実装してみました。 (ソースコード)

以降、何を考えてどういう設計を採用したのかを解説していきます。

考慮すべきポイント

  • 特定のクライアントへの送信に長時間待たされた場合に、他のクライアントへの送信が遅れないようにする。
  • クライアントを切断するのは、 (1)ルーム側から kick する場合, (2)受信エラー, (3)送信エラー
    の3種類を考える必要がある。
  • 送信待ちのメッセージを無制限にバッファリングすると、メッセージを送信するだけで受信しないような
    クライアントがいるとメモリを使いきってしまうので、容易に DoS 攻撃が可能になってしまう。
    この場合メモリを使いきって OOM Killer が動くなどの被害を受ける。

Read のエラー処理

受信エラーだけでなく、送信エラーやサーバーからの kick などでも Read() している goroutine を止める必要があります。

Go の Read() は同期的な API になっているので、 select などを使って他の channel と並列に待つことはできません。 Read() 待ちの goroutine を止めるには、 Read() をエラーで停止させる必要があります。 net.Conn はスレッドセーフに作られているので、他の goroutine から conn.Close() を呼び出すことで Read() を止められます。

この場合、 Read() は io.EOF だけでなく、 closed connection から Read しようとしたという エラーも正常系で発生することになります。このエラーは io.EOF のように公開された変数が用意されて いるわけではありません。
ネットワークエラーだけをログに残したいと思うかもしれませんが、もともと Go の conn.Read() は 返すエラーの一覧を公開していないので、割りきって全て(あるいは EOF を除く全て)のエラーをログに残すようにしましょう。

他の goroutine から conn.Close() が呼ばれているかどうかを正確に判断する方法がないので、 client.Stop() を呼び出しておきます。この関数の処理は後述します。

func (c *Client) receiver() {
    defer c.Stop() // receiver が止まるときはかならず全体を止める.
    // 1行を512byteに制限する.
    reader := bufio.NewReaderSize(c.conn, 512)
    for {
        // Read() する goroutine を中断するのは難しい。
        // 外側で conn.Close() して、エラーで死ぬのが楽.
        // エラーは io.EOF であるとは限らない。
        msg, err := reader.ReadString(byte('\n'))
        if msg != "" {
            //log.Printf("receiver: Received %#v", msg)
            c.recv <- msg
        }
        if err != nil {
            log.Println("receiver: ", err)
            return
        }
    }
}

クライアントの切断処理

メッセージの送信中に他の goroutine から conn.Close() を呼び出すと 中途半端なメッセージを送る可能性があるので、 conn.Close() は送信 goroutine から行います。
そのために、 channel を利用して送信 goroutine に停止通知を行います。

今回のサンプルだと停止しないといけないのは送信 goroutine だけ (送信 goroutine が conn.Close() を呼ぶことで受信 goroutine も止まる)ですが、停止しないといけない goroutine が増えた場合にその分だけ channel を作らなければなりません。
また、クライアントの停止は複数の条件で、複数の goroutine から呼ばれる可能性があります。
停止通知に使うチャンネルへの送信がブロックするといけないので、次のようなコードになるでしょう。

type Client struct {
    // ...
    stopA chan bool,
    stopB chan bool,
    // ...
}

func newClient() *Client {
    c := &Client{
        //...
        // 1回はブロックせずに送信できるように、 buffered channel を使う.
        stopA: make(chan bool, 1),
        stopB: make(chan bool, 1),
        // ...
    }
    // ...
    return c
}

func (c *Client) Stop() {
    select {
    case c.stopA <- true:
    default:  // 既に停止メッセージを送信していてもブロックしない
    }
    select {
    case c.stopB <- true:
    default:
    }
    //...
}

これをもう少し楽にするために、 channel の close をメッセージ代わりに使います。
close() は同じチャンネルに対して複数回呼び出すと2回め以降は panic を起こすので、 recover を使って無視します。

func (c *Client) Stop() {
    // ちなみに defer recover() ではダメ
    defer func() {
        if r := recover(); r != nil {
            log.Println(r)
        }
    }()
    close(c.stop)
}

func (c *Client) sender() {
    defer func() {
        if err := c.conn.Close(); err != nil {
            log.Println(err)
        }
        log.Printf("%v is closed", c)
        clientWait.Done()  // graceful shutdown用. 後述
        c.closed <- c  // room のクライアント管理用. 後述
    }()
    for {
        select {
        case <-c.stop:
            //log.Println("sender: Received stop")
            return
        case msg := <-c.send:
            //log.Printf("sender: %#v", msg)
            _, err := c.conn.Write([]byte(msg))
            // net.Error.Temporary() をハンドルする必要は多分ない.
            // 書き込みできたバイト数が len(msg) より小さい場合は必ず err != nil なので、
            // Write() の第一戻り値は無視してエラーハンドリングだけを行う
            if err != nil {
                log.Println(err)
                return
            }
        }
    }
}

送信遅延の管理

受信側で1メッセージのサイズは制限しているので、送信待ちメッセージ数だけ制限すれば、 メモリを無制限に使うことは防げます。
送信 goroutine へメッセージを送る channel を buffered にすることで、 room goroutine からの送信を待たせないという効果も一緒に得られます。

バッファがいっぱいになった時に単にメッセージを drop すると、クライアントは何も知らずに 受信したメッセージが歯抜けになってしまうので、その場合はクライアントを切断します。

ただし、バッファがいっぱいになる条件はネットワークの送信が遅れた場合だけではありません。
CPUコアが1つしか無いのにたくさんのクライアントを扱う場合などに、メッセージの流量が多いと、 単に全てのクライアントの送信 goroutine に十分に実行権が行き渡ってないだけの可能性があります。

なので、バッファがいっぱいになった時に無制限に待つのは避けますが、固定時間は待つことにします。 goroutine の実行優先順位を制御できないので、あまりにクライアントが多いと待ち時間内に 待っているクライアントの送信 goroutine に実行権が与えられない可能性は残りますが、 それでも高負荷時の動作はかなり安定するはずです。

// Send msg to the client.
func (c *Client) Send(msg string) error {
    // 特定のクライアントが遅いときに、全体を遅くしたくないので、 select を使って
    // すぐに送信できない場合は諦める。
    // ただし、 room goroutine が sender goroutine より速く回ってるためにチャンネルの
    // バッファがいっぱいになってるだけの可能性もあるので、一定時間は待つ.
    // バッファに空きがあるときに時に time.After を生成するのは無駄なので、 select を2段にする.
    select {
    case c.send <- msg:
        return nil
    default:
        select {
        case c.send <- msg:
            return nil
        case <-time.After(time.Millisecond * 10):
            return errors.New("Can't send to client")
        }
    }
}

room のクライアント管理

ここまででクライアントの送受信部分はできてきましたが、クライアントが閉じた時に room 側が持ってる
クライアント一覧からも確実にそのクライアントを削除しないと、 channel が GC されずメモリリーク
してしまいます。

送信 goroutine で確実に conn.Close() を呼び出すようにしたので、そこで一緒に room に
メッセージを送るようにします。

room が持つ client 一覧は、 slice ではなく map にすることで、同時接続数が増えた時も
クライアント削除にかかる時間を一定にできます。

slice からの要素の削除は地味に罠があるので
(slice を縮めても背後の配列に参照が残ってるとGCされないのでリークするので縮める前に末尾へゼロ値を代入する必要がある)、
その点も delete() するだけで済む map の方が楽です。

func newRoom() *Room {
    r := &Room{
        Join:    make(chan *Client),
        Closed:  make(chan *Client),
        Recv:    make(chan string),
        Purge:   make(chan bool),
        Stop:    make(chan chan bool),
        clients: make(map[*Client]bool),
    }
    go r.run()
    return r
}

func (r *Room) run() {
    defer log.Println("Room closed.")
    for {
        select {
        case c := <-r.Join:
            log.Printf("Room: %v is joined", c)
            if err := r.sendLog(c); err != nil {
                log.Println(err)
                c.Stop()
            } else {
                r.clients[c] = true
            }
        case c := <-r.Closed: // c は停止済み.
            log.Printf("Room: %v has been closed", c)
            // delete は指定されたキーが無かったら何もしない
            delete(r.clients, c)
    /// ...
}

graceful shutdown

このチャットでは実装してませんが、例えばサーバー終了時に接続中のクライアントに終了メッセージを送信してから 接続を終了するといった仕様にしたくなる可能性もあります。 graceful shutdown の実装も練習してみましょう。

このチャットでは room を閉じてから、全クライアントの切断を待って、プロセスを終了するのが良さそうです。/> SIGINT (Ctrl-C で送られる) と SIGTERM (オプションなしの kill コマンドで送られる) で graceful shutdown しましょう。

go は main() 関数が終了すると、他にいくら goroutine が動いていてもプログラムが終了します。
なので、 main() 関数の最後で、シグナルを待って、 room に停止メッセージを送り、全クライアントの 切断を待つという処理を書きます。

クライアントの終了を待つのは sync.WaitGroup を利用するのが楽です。
クライアント生成時に wg.Add(1) を行い、送信 goroutine の conn.Close() 直後に wg.Done() を行い、 main 関数の最後では wg.Wait() を呼びます。

// main() の最後
    sigc := make(chan os.Signal, 1)
    signal.Notify(sigc, syscall.SIGUSR1, syscall.SIGTERM, os.Interrupt)
    for sig := range sigc {
        switch sig {
        case syscall.SIGUSR1:
            room.Purge <- true
        case syscall.SIGTERM, os.Interrupt:
            room.Stop <- true
            // 全ての client の sender を待つ
            clientWait.Wait()
            // おわり
            return
        }
    }
func (c *Client) sender() {
    defer func() {
        if err := c.conn.Close(); err != nil {
            log.Println(err)
        }
        log.Printf("%v is closed", c)
        clientWait.Done()
        c.closed <- c
    }()
//...

まとめ

チャットはシンプルに見えますが、送受信がバラバラに発生するので、リクエスト - レスポンス型のプログラムよりも 書くのが難しいです。
その分、チャットを正しく書けるようになれば、それ以外のサーバーの設計をするときに大いに参考になるはずです。

このプログラムは私なりに考えて設計したものですが、他の良いやり方を知っている・思いついた方は ぜひ教えてください。


@methane

songofacandy at 20:27│Comments(0)TrackBack(0)golang 

トラックバックURL

この記事にコメントする

名前:
URL:
  情報を記憶: 評価: 顔   
 
 
 
Blog内検索
Archives
このブログについて
DSASとは、KLab が構築し運用しているコンテンツサービス用のLinuxベースのインフラです。現在5ヶ所のデータセンタにて構築し、運用していますが、我々はDSASをより使いやすく、より安全に、そしてより省力で運用できることを目指して、日々改良に勤しんでいます。
このブログでは、そんな DSAS で使っている技術の紹介や、実験してみた結果の報告、トラブルに巻き込まれた時の経験談など、広く深く、色々な話題を織りまぜて紹介していきたいと思います。
最新コメント