ブロードキャストチャットアプリケーション

この演習では、新しく身につけた知識を使って、ブロードキャスト型のチャットアプリケーションを実装します。クライアントはチャットサーバーに接続し、自分のメッセージを送信します。クライアントは標準入力からユーザーメッセージを読み取り、それをサーバーへ送信します。チャットサーバーは、受信した各メッセージをすべてのクライアントにブロードキャストします。

このために、サーバー側では ブロードキャストチャネル を使用し、クライアントとサーバー間の通信には tokio_websockets を使用します。

新しい Cargo プロジェクトを作成し、次の依存関係を追加してください。

Cargo.toml:

[package]
name = "chat-async"
version = "0.1.0"
edition = "2024"

[dependencies]
futures-util = { version = "0.3.32", features = ["sink"] }
http = "1.4.0"
tokio = { version = "1.52.1", features = ["full"] }
tokio-websockets = { version = "0.13.2", features = ["client", "fastrand", "server", "sha1_smol"] }

必要な API

tokiotokio_websockets から次の関数が必要になります。時間をかけて API に慣れておいてください。

  • WebSocketStream に実装されている StreamExt::next(): WebSocket ストリームからメッセージを非同期に読み取るため。
  • WebSocketStream に実装されている SinkExt::send(): WebSocket ストリーム上でメッセージを非同期に送信するため。
  • Lines::next_line(): 標準入力からユーザーメッセージを非同期に読み取るため。
  • Sender::subscribe(): ブロードキャストチャネルを購読するため。

2 つのバイナリ

通常、Cargo プロジェクトでは 1 つのバイナリと 1 つの src/main.rs ファイルしか持てません。このプロジェクトでは、2 つのバイナリが必要です。1 つはクライアント用、もう 1 つはサーバー用です。これらを 2 つの別々の Cargo プロジェクトにすることもできますが、ここでは 2 つのバイナリを持つ 1 つの Cargo プロジェクトにまとめます。これを機能させるには、クライアントとサーバーのコードを src/bin の下に置く必要があります(ドキュメント を参照)。

次のサーバーコードとクライアントコードを、それぞれ src/bin/server.rssrc/bin/client.rs にコピーしてください。あなたのタスクは、以下の説明に従ってこれらのファイルを完成させることです。

src/bin/server.rs:

// Copyright 2024 Google LLC
// SPDX-License-Identifier: Apache-2.0

use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{Sender, channel};
use tokio_websockets::{Message, ServerBuilder, WebSocketStream};

async fn handle_connection(
    addr: SocketAddr,
    mut ws_stream: WebSocketStream<TcpStream>,
    bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {

    // TODO: ヒントについては、以下のタスクの説明を参照してください。

}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    let (bcast_tx, _) = channel(16);

    let listener = TcpListener::bind("127.0.0.1:2000").await?;
    println!("listening on port 2000");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("New connection from {addr:?}");
        let bcast_tx = bcast_tx.clone();
        tokio::spawn(async move {
            // Wrap the raw TCP stream into a websocket.
            let (_req, ws_stream) = ServerBuilder::new().accept(socket).await?;

            handle_connection(addr, ws_stream, bcast_tx).await
        });
    }
}

src/bin/client.rs:

// Copyright 2024 Google LLC
// SPDX-License-Identifier: Apache-2.0

use futures_util::SinkExt;
use futures_util::stream::StreamExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};

#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
    let (mut ws_stream, _) =
        ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
            .connect()
            .await?;

    let stdin = tokio::io::stdin();
    let mut stdin = BufReader::new(stdin).lines();


    // TODO: ヒントについては、以下のタスクの説明を参照してください。

}

バイナリの実行

サーバーは次のように実行します。

cargo run --bin server

クライアントは次のように実行します。

cargo run --bin client

タスク

  • src/bin/server.rshandle_connection 関数を実装してください。
    • ヒント: tokio::select! を使って、継続的なループ内で 2 つのタスクを並行して実行します。1 つのタスクはクライアントからメッセージを受信してブロードキャストします。もう 1 つはサーバーが受信したメッセージをクライアントへ送信します。
  • src/bin/client.rs の main 関数を完成させてください。
    • ヒント: これまでと同様に、tokio::select! を継続的なループ内で使って 2 つのタスクを並行して実行します: (1) 標準入力からユーザーメッセージを読み取ってサーバーへ送信すること、(2) サーバーからメッセージを受信してユーザーに表示すること。
  • 任意: 完了したら、メッセージの送信者以外のすべてのクライアントにメッセージをブロードキャストするようにコードを変更してください。