Python으로 간단한 채팅 프로그램 작성 튜토리얼
구현 아이디어
x01 서버 구축
우선, 서버 측에서 소켓은 메시지를 수락하는 데 사용됩니다. 소켓 요청이 수락될 때마다 새 스레드가 열려 메시지의 배포 및 수락을 관리합니다. 동시에 채팅방의 다양한 기능을 처리하기 위해 모든 스레드를 관리하는 핸들러가 있습니다
x02 클라이언트 설정
클라이언트 설정은 서버보다 훨씬 간단합니다. 클라이언트는 메시지를 보내고 특정 규칙에 따라 특정 문자를 입력하면 됩니다. 따라서 클라이언트 측에서는 두 개의 스레드만 사용하면 됩니다. 하나는 메시지 수신 전용입니다. other는 전송 전용입니다.
우리가 하나를 사용하지 않는 이유는 하나만 사용하면 메시지가 수신될 때 메시지를 받는 사람이 메시지를 보내기 전에 차단된 상태가 되기 때문입니다. 메시지 보내기도 마찬가지이므로 이 두 기능을 한 곳에 구현하면 계속 메시지를 주고받을 방법이 없게 됩니다
구현 방법
서버측 구현
import json import threading from socket import * from time import ctime class PyChattingServer: __socket = socket(AF_INET, SOCK_STREAM, 0) __address = ('', 12231) __buf = 1024 def __init__(self): self.__socket.bind(self.__address) self.__socket.listen(20) self.__msg_handler = ChattingHandler() def start_session(self): print('等待客户连接...\r\n') try: while True: cs, caddr = self.__socket.accept() # 利用handler来管理线程,实现线程之间的socket的相互通信 self.__msg_handler.start_thread(cs, caddr) except socket.error: pass class ChattingThread(threading.Thread): __buf = 1024 def __init__(self, cs, caddr, msg_handler): super(ChattingThread, self).__init__() self.__cs = cs self.__caddr = caddr self.__msg_handler = msg_handler # 使用多线程管理会话 def run(self): try: print('...连接来自于:', self.__caddr) data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n' self.__cs.sendall(bytes(data, 'utf-8')) while True: data = self.__cs.recv(self.__buf).decode('utf-8') if not data: break self.__msg_handler.handle_msg(data, self.__cs) print(data) except socket.error as e: print(e.args) pass finally: self.__msg_handler.close_conn(self.__cs) self.__cs.close() class ChattingHandler: __help_str = "[ SYSTEM ]\r\n" \ "输入/ls,即可获得所有登陆用户信息\r\n" \ "输入/h,即可获得帮助\r\n" \ "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \ "输入/i,即可屏蔽群聊信息\r\n" \ "再次输入/i,即可取消屏蔽\r\n" \ "所有首字符为/的信息都不会发送出去" __buf = 1024 __socket_list = [] __user_name_to_socket = {} __socket_to_user_name = {} __user_name_to_broadcast_state = {} def start_thread(self, cs, caddr): self.__socket_list.append(cs) chat_thread = ChattingThread(cs, caddr, self) chat_thread.start() def close_conn(self, cs): if cs not in self.__socket_list: return # 去除socket的记录 nickname = "SOMEONE" if cs in self.__socket_list: self.__socket_list.remove(cs) # 去除socket与username之间的映射关系 if cs in self.__socket_to_user_name: nickname = self.__socket_to_user_name[cs] self.__user_name_to_socket.pop(self.__socket_to_user_name[cs]) self.__socket_to_user_name.pop(cs) self.__user_name_to_broadcast_state.pop(nickname) nickname += " " # 广播某玩家退出聊天室 self.broadcast_system_msg(nickname + "离开了PY_CHATTING") # 管理用户输入的信息 def handle_msg(self, msg, cs): js = json.loads(msg) if js['type'] == "login": if js['msg'] not in self.__user_name_to_socket: if ' ' in js['msg']: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '账号不能够带有空格' }), cs) else: self.__user_name_to_socket[js['msg']] = cs self.__socket_to_user_name[cs] = js['msg'] self.__user_name_to_broadcast_state[js['msg']] = True self.send_to(json.dumps({ 'type': 'login', 'success': True, 'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)' }), cs) # 广播其他人,他已经进入聊天室 self.broadcast_system_msg(js['msg'] + "已经进入了聊天室") else: self.send_to(json.dumps({ 'type': 'login', 'success': False, 'msg': '账号已存在' }), cs) # 若玩家处于屏蔽模式,则无法发送群聊消息 elif js['type'] == "broadcast": if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: self.broadcast(js['msg'], cs) else: self.send_to(json.dumps({ 'type': 'broadcast', 'msg': '屏蔽模式下无法发送群聊信息' }), cs) elif js['type'] == "ls": self.send_to(json.dumps({ 'type': 'ls', 'msg': self.get_all_login_user_info() }), cs) elif js['type'] == "help": self.send_to(json.dumps({ 'type': 'help', 'msg': self.__help_str }), cs) elif js['type'] == "sendto": self.single_chatting(cs, js['nickname'], js['msg']) elif js['type'] == "ignore": self.exchange_ignore_state(cs) def exchange_ignore_state(self, cs): if cs in self.__socket_to_user_name: state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] if state: state = False else: state = True self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs]) self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: msg = "通常模式" else: msg = "屏蔽模式" self.send_to(json.dumps({ 'type': 'ignore', 'success': True, 'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg) }), cs) else: self.send_to({ 'type': 'ignore', 'success': False, 'msg': '切换失败' }, cs) def single_chatting(self, cs, nickname, msg): if nickname in self.__user_name_to_socket: msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % ( ctime(), self.__socket_to_user_name[cs], nickname, msg) self.send_to_list(json.dumps({ 'type': 'single', 'msg': msg }), self.__user_name_to_socket[nickname], cs) else: self.send_to(json.dumps({ 'type': 'single', 'msg': '该用户不存在' }), cs) print(nickname) def send_to_list(self, msg, *cs): for i in range(len(cs)): self.send_to(msg, cs[i]) def get_all_login_user_info(self): login_list = "[ SYSTEM ] ALIVE USER : \r\n" for key in self.__socket_to_user_name: login_list += self.__socket_to_user_name[key] + ",\r\n" return login_list def send_to(self, msg, cs): if cs not in self.__socket_list: self.__socket_list.append(cs) cs.sendall(bytes(msg, 'utf-8')) def broadcast_system_msg(self, msg): data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg) js = json.dumps({ 'type': 'system_msg', 'msg': data }) # 屏蔽了群聊的玩家也可以获得系统的群发信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def broadcast(self, msg, cs): data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg) js = json.dumps({ 'type': 'broadcast', 'msg': data }) # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name \ and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]: self.__socket_list[i].sendall(bytes(js, 'utf-8')) def main(): server = PyChattingServer() server.start_session() main()
클라이언트측 구현
import json import threading from socket import * is_login = False is_broadcast = True class ClientReceiveThread(threading.Thread): __buf = 1024 def __init__(self, cs): super(ClientReceiveThread, self).__init__() self.__cs = cs def run(self): self.receive_msg() def receive_msg(self): while True: msg = self.__cs.recv(self.__buf).decode('utf-8') if not msg: break js = json.loads(msg) if js['type'] == "login": if js['success']: global is_login is_login = True print(js['msg']) elif js['type'] == "ignore": if js['success']: global is_broadcast if is_broadcast: is_broadcast = False else: is_broadcast = True print(js['msg']) else: if not is_broadcast: print("[现在处于屏蔽模式]") print(js['msg']) class ClientSendMsgThread(threading.Thread): def __init__(self, cs): super(ClientSendMsgThread, self).__init__() self.__cs = cs def run(self): self.send_msg() # 根据不同的输入格式来进行不同的聊天方式 def send_msg(self): while True: js = None msg = input() if not is_login: js = json.dumps({ 'type': 'login', 'msg': msg }) elif msg[0] == "@": data = msg.split(' ') if not data: print("请重新输入") break nickname = data[0] nickname = nickname.strip("@") if len(data) == 1: data.append(" ") js = json.dumps({ 'type': 'sendto', 'nickname': nickname, 'msg': data[1] }) elif msg == "/help": js = json.dumps({ 'type': 'help', 'msg': None }) elif msg == "/ls": js = json.dumps({ 'type': 'ls', 'msg': None }) elif msg == "/i": js = json.dumps({ 'type': 'ignore', 'msg': None }) else: if msg[0] != '/': js = json.dumps({ 'type': 'broadcast', 'msg': msg }) if js is not None: self.__cs.sendall(bytes(js, 'utf-8')) def main(): buf = 1024 # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了 address = ("127.0.0.1", 12231) cs = socket(AF_INET, SOCK_STREAM, 0) cs.connect(address) data = cs.recv(buf).decode("utf-8") if data: print(data) receive_thread = ClientReceiveThread(cs) receive_thread.start() send_thread = ClientSendMsgThread(cs) send_thread.start() while True: pass main()
위 내용은 Python으로 간단한 채팅 프로그램 작성 튜토리얼의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undress AI Tool
무료로 이미지를 벗다

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Clothoff.io
AI 옷 제거제

Video Face Swap
완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

PyoDBC 설치 : PipinStallPyODBC 명령을 사용하여 라이브러리를 설치하십시오. 2. SQLSERVER 연결 : PYODBC.connect () 메소드를 통해 드라이버, 서버, 데이터베이스, UID/PWD 또는 Trusted_Connection이 포함 된 연결 문자열을 사용하고 SQL 인증 또는 Windows 인증을 각각 지원합니다. 3. 설치된 드라이버를 확인하십시오 : pyodbc.drivers ()를 실행하고 'sqlserver'가 포함 된 드라이버 이름을 필터링하여 올바른 드라이버 이름이 'sqlserver 용 Odbcdriver17과 같은 올바른 드라이버 이름을 사용하는지 확인하십시오. 4. 연결 문자열의 키 매개 변수

pythontanbeoptimizedformemory-boundoperations는 Headgroughgenerations, 효율적 인 데이터 구조, 및 ManagingObjectLifetimes.first, usegeneratorsinsteadoflistStoprocessLargedAtasetSoneitematime, theintintomemory.second를 피하십시오

shutil.rmtree ()는 전체 디렉토리 트리를 재귀 적으로 삭제하는 파이썬의 함수입니다. 지정된 폴더와 모든 내용을 삭제할 수 있습니다. 1. 기본 사용법 : shutil.rmtree (Path)를 사용하여 디렉토리를 삭제하고 filenotfounderRor, AprismenterRor 및 기타 예외를 처리해야합니다. 2. 실제 응용 프로그램 : 임시 데이터 또는 캐시 디렉토리와 같은 한 번의 클릭으로 하위 디렉토리 및 파일을 포함하는 폴더를 지울 수 있습니다. 3. 참고 : 삭제 작업은 복원되지 않습니다. 경로가 존재하지 않을 때 filenotfounderror가 던져집니다. 권한이나 파일 직업으로 인해 실패 할 수 있습니다. 4. 선택적 매개 변수 : ingore_errors = true로 오류를 무시할 수 있습니다

psycopg2.pool.simpleconnectionpool을 사용하여 데이터베이스 연결을 효과적으로 관리하고 빈번한 연결 생성 및 파괴로 인한 성능 오버 헤드를 피하십시오. 1. 연결 풀을 만들 때 연결 풀이 성공적으로 초기화되도록 최소 및 최대 연결 및 데이터베이스 연결 매개 변수를 지정하십시오. 2. getConn ()을 통해 연결을 가져 와서 putconn ()을 사용하여 데이터베이스 작업을 실행 한 후 풀에 연결을 반환하십시오. 끊임없이 Conn.Close () 호출 금지됩니다. 3. SimpleConnectionPool은 스레드 안전이며 다중 스레드 환경에 적합합니다. 4. 예외가 표시 될 때 연결을 올바르게 반환 할 수 있도록 컨텍스트 관리자와 함께 컨텍스트 관리자를 구현하는 것이 좋습니다.

iter ()는 반복자 객체를 얻는 데 사용되며 다음 ()은 다음 요소를 얻는 데 사용됩니다. 1. iterator ()를 사용하여 목록과 같은 반복 가능한 객체를 반복자로 변환합니다. 2. 다음 ()을 호출하여 요소를 하나씩 얻고 요소가 소진 될 때 트리거 스톱 레이션 예외; 3. 다음 (반복자, 기본값)을 사용하여 예외를 피하십시오. 4. 커스텀 반복자는 반복 로직을 제어하기 위해 __iter __ () 및 __next __ () 메소드를 구현해야합니다. 기본값을 사용하는 것은 안전한 트래버스를위한 일반적인 방법이며 전체 메커니즘은 간결하고 실용적입니다.

통계 중재 통계 중 차익 거래 소개는 수학적 모델을 기반으로 금융 시장에서 가격 불일치를 포착하는 거래 방법입니다. 핵심 철학은 평균 회귀에서 비롯된 것, 즉 자산 가격이 단기적으로 장기 추세에서 벗어날 수 있지만 결국 역사적 평균으로 돌아갈 것입니다. 거래자는 통계적 방법을 사용하여 자산 간의 상관 관계를 분석하고 일반적으로 동기식으로 변경되는 포트폴리오를 찾습니다. 이러한 자산의 가격 관계가 비정상적으로 벗어나면 차익 거래 기회가 발생합니다. cryptocurrency 시장에서 통계적 차익 거래는 특히 시장 자체의 비 효율성과 급격한 변동으로 인해 널리 퍼져 있습니다. 기존 금융 시장과 달리 암호 화폐는 24 시간 내내 운영되며 가격은 뉴스, 소셜 미디어 감정 및 기술 업그레이드에 매우 취약합니다. 이 일정한 가격 변동은 종종 가격 책정 편견을 만들고 중재자를 제공합니다.

해당 데이터베이스 드라이버를 설치하십시오. 2. Connect ()를 사용하여 데이터베이스에 연결하십시오. 3. 커서 객체를 만듭니다. 4. Execute () 또는 Executemany ()를 사용하여 SQL을 실행하고 매개 변수화 된 쿼리를 사용하여 주입을 방지하십시오. 5. 결과를 얻으려면 fetchall () 등을 사용하십시오. 6. 수정 후 Commit ()가 필요합니다. 7. 마지막으로 연결을 닫거나 컨텍스트 관리자를 사용하여 자동으로 처리하십시오. 완전한 프로세스는 SQL 작업이 안전하고 효율적임을 보장합니다.

파이썬 가상 환경을 만들려면 VenV 모듈을 사용할 수 있습니다. 단계는 다음과 같습니다. 1. 프로젝트 디렉토리를 입력하여 환경을 만들기 위해 Python-Mvenvenv 환경을 실행하십시오. 2. Sourceenv/bin/활성화 Mac/Linux 및 Env \ Scripts \ Windows로 활성화; 3. PipinStall 설치 패키지, PipFreeze> 요구 사항을 사용하여 종속성을 내보내십시오. 4. 가상 환경을 GIT에 제출하지 않도록주의하고 설치 중에 올바른 환경에 있는지 확인하십시오. 가상 환경은 프로젝트 종속성을 분리하여 충돌을 방지 할 수 있습니다. 특히 다중 프로젝트 개발에 적합합니다. Pycharm 또는 VScode와 같은 편집자도 있습니다.
