目次
実装アイデア
ホームページ バックエンド開発 Python チュートリアル Python で簡単なチャット プログラムを作成するためのチュートリアル

Python で簡単なチャット プログラムを作成するためのチュートリアル

May 08, 2023 pm 06:37 PM
python アプレット

実装アイデア

x01 サーバーの確立

まず、サーバー側でソケットを使用してメッセージを受け取ります。ソケットがオープンされ、メッセージの配信と受信を新しいスレッドで管理すると同時に、すべてのスレッドを管理するハンドラーが存在し、チャット ルームのさまざまな機能の処理を実現します

#x02 クライアントの構築

クライアントの構築はサーバーに比べて非常に簡単で、クライアントの機能はメッセージの送受信と、特定の文字を特定の規則に従って入力することだけです。したがって、クライアント側では 2 つのスレッドを使用するだけで済み、1 つはメッセージの受信専用、もう 1 つはメッセージの送信専用になります。 , 1 つだけを使用する場合、メッセージを受信した後、メッセージを受信した側は送信する前にブロック状態になります。同様に、メッセージの送信についても同じことが当てはまります。これら 2 つの関数が 1 か所に実装されている場合、

実装方法

##サーバーサイド実装

##

import json
import threading
from socket import *
from time import ctime


class PyChattingServer:
    __socket = socket(AF_INET, SOCK_STREAM, 0)
    __address = ('', 12231)

    __buf = 1024

    def __init__(self):
        self.__socket.bind(self.__address)
        self.__socket.listen(20)
        self.__msg_handler = ChattingHandler()

    def start_session(self):
        print('等待客户连接...\r\n')
        try:
            while True:
                cs, caddr = self.__socket.accept()
                # 利用handler来管理线程,实现线程之间的socket的相互通信
                self.__msg_handler.start_thread(cs, caddr)
        except socket.error:
            pass


class ChattingThread(threading.Thread):
    __buf = 1024

    def __init__(self, cs, caddr, msg_handler):
        super(ChattingThread, self).__init__()
        self.__cs = cs
        self.__caddr = caddr
        self.__msg_handler = msg_handler

    # 使用多线程管理会话
    def run(self):
        try:
            print('...连接来自于:', self.__caddr)
            data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n'
            self.__cs.sendall(bytes(data, 'utf-8'))
            while True:
                data = self.__cs.recv(self.__buf).decode('utf-8')
                if not data:
                    break
                self.__msg_handler.handle_msg(data, self.__cs)
                print(data)
        except socket.error as e:
            print(e.args)
            pass
        finally:
            self.__msg_handler.close_conn(self.__cs)
            self.__cs.close()


class ChattingHandler:
    __help_str = "[ SYSTEM ]\r\n" \
                 "输入/ls,即可获得所有登陆用户信息\r\n" \
                 "输入/h,即可获得帮助\r\n" \
                 "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \
                 "输入/i,即可屏蔽群聊信息\r\n" \
                 "再次输入/i,即可取消屏蔽\r\n" \
                 "所有首字符为/的信息都不会发送出去"

    __buf = 1024
    __socket_list = []

    __user_name_to_socket = {}
    __socket_to_user_name = {}

    __user_name_to_broadcast_state = {}

    def start_thread(self, cs, caddr):
        self.__socket_list.append(cs)
        chat_thread = ChattingThread(cs, caddr, self)
        chat_thread.start()

    def close_conn(self, cs):
        if cs not in self.__socket_list:
            return
        # 去除socket的记录
        nickname = "SOMEONE"
        if cs in self.__socket_list:
            self.__socket_list.remove(cs)
        # 去除socket与username之间的映射关系
        if cs in self.__socket_to_user_name:
            nickname = self.__socket_to_user_name[cs]
            self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
            self.__socket_to_user_name.pop(cs)
            self.__user_name_to_broadcast_state.pop(nickname)
        nickname += " "
        # 广播某玩家退出聊天室
        self.broadcast_system_msg(nickname + "离开了PY_CHATTING")

    # 管理用户输入的信息
    def handle_msg(self, msg, cs):
        js = json.loads(msg)
        if js['type'] == "login":
            if js['msg'] not in self.__user_name_to_socket:
                if ' ' in js['msg']:
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': False,
                        'msg': '账号不能够带有空格'
                    }), cs)
                else:
                    self.__user_name_to_socket[js['msg']] = cs
                    self.__socket_to_user_name[cs] = js['msg']
                    self.__user_name_to_broadcast_state[js['msg']] = True
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': True,
                        'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'
                    }), cs)
                    # 广播其他人,他已经进入聊天室
                    self.broadcast_system_msg(js['msg'] + "已经进入了聊天室")
            else:
                self.send_to(json.dumps({
                    'type': 'login',
                    'success': False,
                    'msg': '账号已存在'
                }), cs)
        # 若玩家处于屏蔽模式,则无法发送群聊消息
        elif js['type'] == "broadcast":
            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                self.broadcast(js['msg'], cs)
            else:
                self.send_to(json.dumps({
                    'type': 'broadcast',
                    'msg': '屏蔽模式下无法发送群聊信息'
                }), cs)
        elif js['type'] == "ls":
            self.send_to(json.dumps({
                'type': 'ls',
                'msg': self.get_all_login_user_info()
            }), cs)
        elif js['type'] == "help":
            self.send_to(json.dumps({
                'type': 'help',
                'msg': self.__help_str
            }), cs)
        elif js['type'] == "sendto":
            self.single_chatting(cs, js['nickname'], js['msg'])
        elif js['type'] == "ignore":
            self.exchange_ignore_state(cs)

    def exchange_ignore_state(self, cs):
        if cs in self.__socket_to_user_name:
            state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
            if state:
                state = False
            else:
                state = True
            self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])
            self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                msg = "通常模式"
            else:
                msg = "屏蔽模式"
            self.send_to(json.dumps({
                'type': 'ignore',
                'success': True,
                'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg)
            }), cs)
        else:
            self.send_to({
                'type': 'ignore',
                'success': False,
                'msg': '切换失败'
            }, cs)

    def single_chatting(self, cs, nickname, msg):
        if nickname in self.__user_name_to_socket:
            msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % (
                ctime(), self.__socket_to_user_name[cs], nickname, msg)
            self.send_to_list(json.dumps({
                'type': 'single',
                'msg': msg
            }), self.__user_name_to_socket[nickname], cs)
        else:
            self.send_to(json.dumps({
                'type': 'single',
                'msg': '该用户不存在'
            }), cs)
        print(nickname)

    def send_to_list(self, msg, *cs):
        for i in range(len(cs)):
            self.send_to(msg, cs[i])

    def get_all_login_user_info(self):
        login_list = "[ SYSTEM ] ALIVE USER : \r\n"
        for key in self.__socket_to_user_name:
            login_list += self.__socket_to_user_name[key] + ",\r\n"
        return login_list

    def send_to(self, msg, cs):
        if cs not in self.__socket_list:
            self.__socket_list.append(cs)
        cs.sendall(bytes(msg, 'utf-8'))

    def broadcast_system_msg(self, msg):
        data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg)
        js = json.dumps({
            'type': 'system_msg',
            'msg': data
        })
        # 屏蔽了群聊的玩家也可以获得系统的群发信息
        for i in range(len(self.__socket_list)):
            if self.__socket_list[i] in self.__socket_to_user_name:
                self.__socket_list[i].sendall(bytes(js, 'utf-8'))

    def broadcast(self, msg, cs):
        data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
        js = json.dumps({
            'type': 'broadcast',
            'msg': data
        })
        # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息
        for i in range(len(self.__socket_list)):
            if self.__socket_list[i] in self.__socket_to_user_name \
                    and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
                self.__socket_list[i].sendall(bytes(js, 'utf-8'))


def main():
    server = PyChattingServer()
    server.start_session()


main()
Python で簡単なチャット プログラムを作成するためのチュートリアルクライアント側の実装

Python で簡単なチャット プログラムを作成するためのチュートリアル

import json
import threading
from socket import *

is_login = False
is_broadcast = True


class ClientReceiveThread(threading.Thread):
    __buf = 1024

    def __init__(self, cs):
        super(ClientReceiveThread, self).__init__()
        self.__cs = cs

    def run(self):
        self.receive_msg()

    def receive_msg(self):
        while True:
            msg = self.__cs.recv(self.__buf).decode('utf-8')
            if not msg:
                break
            js = json.loads(msg)
            if js['type'] == "login":
                if js['success']:
                    global is_login
                    is_login = True
                print(js['msg'])
            elif js['type'] == "ignore":
                if js['success']:
                    global is_broadcast
                    if is_broadcast:
                        is_broadcast = False
                    else:
                        is_broadcast = True
                print(js['msg'])
            else:
                if not is_broadcast:
                    print("[现在处于屏蔽模式]")
                print(js['msg'])


class ClientSendMsgThread(threading.Thread):

    def __init__(self, cs):
        super(ClientSendMsgThread, self).__init__()
        self.__cs = cs

    def run(self):
        self.send_msg()

    # 根据不同的输入格式来进行不同的聊天方式
    def send_msg(self):
        while True:
            js = None
            msg = input()
            if not is_login:
                js = json.dumps({
                    'type': 'login',
                    'msg': msg
                })
            elif msg[0] == "@":
                data = msg.split(' ')
                if not data:
                    print("请重新输入")
                    break
                nickname = data[0]
                nickname = nickname.strip("@")
                if len(data) == 1:
                    data.append(" ")
                js = json.dumps({
                    'type': 'sendto',
                    'nickname': nickname,
                    'msg': data[1]
                })
            elif msg == "/help":
                js = json.dumps({
                    'type': 'help',
                    'msg': None
                })
            elif msg == "/ls":
                js = json.dumps({
                    'type': 'ls',
                    'msg': None
                })
            elif msg == "/i":
                js = json.dumps({
                    'type': 'ignore',
                    'msg': None
                })
            else:
                if msg[0] != '/':
                    js = json.dumps({
                        'type': 'broadcast',
                        'msg': msg
                    })
            if js is not None:
                self.__cs.sendall(bytes(js, 'utf-8'))


def main():
    buf = 1024
    # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了
    address = ("127.0.0.1", 12231)
    cs = socket(AF_INET, SOCK_STREAM, 0)
    cs.connect(address)
    data = cs.recv(buf).decode("utf-8")
    if data:
        print(data)
    receive_thread = ClientReceiveThread(cs)
    receive_thread.start()
    send_thread = ClientSendMsgThread(cs)
    send_thread.start()
    while True:
        pass


main()

以上がPython で簡単なチャット プログラムを作成するためのチュートリアルの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

python shotil rmtreeの例 python shotil rmtreeの例 Aug 01, 2025 am 05:47 AM

shutil.rmtree()は、ディレクトリツリー全体を再帰的に削除するPythonの関数です。指定されたフォルダーとすべてのコンテンツを削除できます。 1.基本的な使用法:shutil.rmtree(PATH)を使用してディレクトリを削除すると、FilenotFounderror、PermissionError、その他の例外を処理する必要があります。 2。実用的なアプリケーション:一時的なデータやキャッシュディレクトリなど、サブディレクトリとファイルを1回クリックして含むフォルダーをクリアできます。 3。注:削除操作は復元されません。 FilenotFounderrorは、パスが存在しない場合に投げられます。許可またはファイル職業のために失敗する可能性があります。 4.オプションのパラメーター:INGRORE_ERRORS = trueでエラーを無視できます

Pythonで仮想環境を作成する方法 Pythonで仮想環境を作成する方法 Aug 05, 2025 pm 01:05 PM

Python仮想環境を作成するには、VENVモジュールを使用できます。手順は次のとおりです。1。プロジェクトディレクトリを入力して、python-mvenvenv環境を実行して環境を作成します。 2。SourceENV/bin/Activate to Mac/LinuxおよびEnv \ Scripts \ Windowsにアクティブ化します。 3. PIPINSTALLインストールパッケージ、PIPFREEZE> RECUMESSION.TXTを使用して、依存関係をエクスポートします。 4.仮想環境をGITに提出しないように注意し、設置中に正しい環境にあることを確認してください。仮想環境は、特にマルチプロジェクト開発に適した競合を防ぐためにプロジェクト依存関係を分離でき、PycharmやVSCodeなどの編集者も

PythonでSQLクエリを実行する方法は? PythonでSQLクエリを実行する方法は? Aug 02, 2025 am 01:56 AM

対応するデータベースドライバーをインストールします。 2。CONNECT()を使用してデータベースに接続します。 3.カーソルオブジェクトを作成します。 4。Execute()またはexecuteMany()を使用してSQLを実行し、パラメーター化されたクエリを使用して噴射を防ぎます。 5。Fetchall()などを使用して結果を得る。 6。COMMING()は、変更後に必要です。 7.最後に、接続を閉じるか、コンテキストマネージャーを使用して自動的に処理します。完全なプロセスにより、SQL操作が安全で効率的であることが保証されます。

Pythonの複数のプロセス間でデータを共有する方法は? Pythonの複数のプロセス間でデータを共有する方法は? Aug 02, 2025 pm 01:15 PM

MultiProcessing.Queueを使用して、複数のプロセスと消費者のシナリオに適した複数のプロセス間でデータを安全に渡す。 2。MultiProcessing.Pipeを使用して、2つのプロセス間の双方向の高速通信を実現しますが、2点接続のみ。 3.値と配列を使用して、シンプルなデータ型を共有メモリに保存し、競争条件を回避するためにロックで使用する必要があります。 4.マネージャーを使用して、リストや辞書などの複雑なデータ構造を共有します。これらは非常に柔軟ですが、パフォーマンスが低く、複雑な共有状態を持つシナリオに適しています。データサイズ、パフォーマンス要件、複雑さに基づいて適切な方法を選択する必要があります。キューとマネージャーは、初心者に最適です。

Python boto3 S3アップロード例 Python boto3 S3アップロード例 Aug 02, 2025 pm 01:08 PM

BOTO3を使用してファイルをS3にアップロードしてBOTO3を最初にインストールし、AWS資格情報を構成します。 2。boto3.client( 's3')を介してクライアントを作成し、upload_file()メソッドを呼び出してローカルファイルをアップロードします。 3. S3_Keyをターゲットパスとして指定し、指定されていない場合はローカルファイル名を使用できます。 4. filenotfounderror、nocredentialserror、clienterrorなどの例外を処理する必要があります。 5。ACL、ContentType、StorageClass、Metadataは、exrceargsパラメーターを介して設定できます。 6。メモリデータについては、bytesioを使用して単語を作成できます

Pythonのリストを使用してスタックデータ構造を実装する方法は? Pythonのリストを使用してスタックデータ構造を実装する方法は? Aug 03, 2025 am 06:45 AM

pythonlistscani実装Append()penouspop()popoperations.1.useappend()2つのBelief stotetopthestack.2.usep op()toremoveandreturnthetop要素、保証済みのtocheckeckeckestackisnotemptoavoidindexerror.3.pekattehatopelementwithstack [-1]

Pythonでの弱い参照とは何ですか?いつ使用する必要がありますか? Pythonでの弱い参照とは何ですか?いつ使用する必要がありますか? Aug 01, 2025 am 06:19 AM

weadReferencexisttoalowrecerencing objectswithoutpreventing theirgarbagecollection、helpingmemoryLeaksandcularReferences.1.useweakkeydiction aryorweakvaluedictionaryforforcacheSompapingStoleTunusedOunusedObjects.becolted.2.2

Pythonスケジュールライブラリの例 Pythonスケジュールライブラリの例 Aug 04, 2025 am 10:33 AM

Pythonscheduleライブラリを使用して、タイミングタスクを簡単に実装します。まず、PipinstallScheduleを介してライブラリをインストールし、スケジュールモジュールと時間モジュールをインポートし、定期的に実行する必要がある関数を定義し、スケジュールを使用して時間間隔を設定してタスク関数を結合します。最後に、スケジュールを呼び出してください。たとえば、10秒ごとにタスクを実行すると、スケジュールとして記述できます。すべて(10).seconds.do(job)。数分、数時間、日、週などをサポートし、特定のタスクを指定することもできます。

See all articles