Flask 기반 종단간 암호화 메신저 코드 분석 및 설명
Flask와 Python을 이용해 구현된 종단간 암호화(E2EE) 메신저의 전체 구조에 대해 설명하겠습니다.
프로젝트 개요
이 메신저는 Flask 웹 프레임워크와 Flask-SocketIO를 기반으로 하며, RSA와 AES 알고리즘을 조합한 종단간 암호화를 지원합니다. 사용자 인증, 1:1 비밀 채팅, 공개 채팅, 회원가입 등 기본적인 메신저 기능을 모두 갖추고 있습니다
사용된 파이썬 라이브러리 및 환경
Flask: 웹 서버 및 라우팅
Flask-SocketIO: 실시간 채팅 구현
PyMySQL, SQLAlchemy: MySQL 데이터베이스 연동
bcrypt: 비밀번호 해시 처리
PyCryptodome: RSA, AES 암호화/복호화
Jinja2: 템플릿 렌더링
사용된 서버 코드
```import socket import threading import pymysql import bcrypt from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_OAEP from flask import Flask, request, render_template, redirect, session, url_for from sqlalchemy import create_engine, text from flask_socketio import SocketIO, send, join_room, emit
app = Flask(name) app.config.from_pyfile(‘config.py’) socketio = SocketIO(app, manage_session=True)
database = create_engine(app.config[‘DB_URL’], pool_pre_ping=True, pool_recycle=3600, echo=True)
def encrypt_message(message, public_key): # 메시지를 공개키로 암호화 cipher = PKCS1_OAEP.new(public_key) return cipher.encrypt(message.encode())
def decrypt_message(encrypted_message, private_key): # 암호화된 메시지를 개인키로 복호화 cipher = PKCS1_OAEP.new(private_key) return cipher.decrypt(encrypted_message).decode()
def make_key(): # RSA 공개키 및 개인키 생성 pr_key = RSA.generate(1024) pu_key = pr_key.public_key() return pr_key, pu_key
def get_login(user_id): # :id는 플레이스 홀더 역할로 값을 넣을 자리를 표시 with database.connect() as conn: query = text(“SELECT id,passwd FROM PRIVATE WHERE id = :id”) result = conn.execute(query, {“id”: user_id}).fetchone() if result: return {“id”: result[0], “passwd”: result[1]} else: return
def get_duplication(user_id): query = text(“SELECT id FROM USERINFO WHERE id = :id”) with database.connect() as conn: result = conn.execute(query, {“id”: user_id}).fetchone() if result: return {“id”: result[0]} else: return
@app.route(‘/’) def index(): return render_template(‘index.html’)
@app.route(‘/login’, methods=[‘GET’, ‘POST’]) def sign_in(): if ‘username’ in session: return redirect(‘/’) if request.method == ‘POST’: user_id = request.form.get(“user_id”) password = request.form.get(“password”) userinfo = get_login(user_id) encode_pw = password.encode(“utf-8”) if userinfo: encode_ckpw = userinfo[“passwd”] if bcrypt.checkpw(encode_pw, encode_ckpw): session[“username”] = user_id return redirect(‘/’) else: result = “아이디 혹은 비밀번호가 다릅니다.” return render_template(‘login_result.html’, result=result) else: result = “아이디 혹은 비밀번호가 다릅니다.” return render_template(‘login_result.html’, result=result) return render_template(‘login.html’)
@app.route(‘/logout’) def logout(): session.pop(‘username’, None) return redirect(‘/’)
@app.route(‘/register’, methods=[‘GET’, ‘POST’]) def sign_up(): if request.method == ‘POST’: user_name = request.form.get(“user_name”) user_id = request.form.get(“user_id”) password = request.form.get(“password”) b_password = bytes(password, “utf-8”) b_hashed_password = bcrypt.hashpw(password=b_password, salt=bcrypt.gensalt())
duplication = get_duplication(user_id)
if duplication:
return "사용할 수 없는 아이디입니다."
private_key, public_key = make_key()
pri_str = private_key.export_key().decode()
pub_str = public_key.export_key().decode()
try:
uquery = text("""INSERT INTO USERINFO(name,id,pub_key) VALUES (:name,:id,:pub_key)""")
with database.connect() as conn:
conn.execute(uquery, {"name": user_name, "id": user_id, "pub_key": pub_str})
conn.commit()
kquery = text("""INSERT INTO PRIVATE(id,passwd,pri_key) VALUES (:id,:passwd,:pri_key)""")
with database.connect() as conn:
conn.execute(kquery, {"id": user_id, "passwd": b_hashed_password, "pri_key": pri_str})
conn.commit()
result = "회원가입 성공"
return render_template('register_result.html', result=result)
except Exception as e:
print("에러 발생:", e)
result = "회원가입 실패"
return render_template('register_result.html', result=result)
return render_template('register.html')
@app.route(‘/openchat’) def open_chat(): query = text(“"”SELECT chat FROM ACHAT ORDER BY num ASC”””) with database.connect() as conn: result = conn.execute(query).fetchall() chats = [{“chat”: row[0]} for row in result] return render_template(‘webchat.html’, chats=chats)
@socketio.on(‘open_message’) def handle_message(msg): query = text(“"”INSERT INTO ACHAT(chat) VALUES(:chat)”””) with database.connect() as conn: conn.execute(query, {“chat”: msg}) conn.commit() send(msg, broadcast=True)
@app.route(‘/chat’, methods=[‘GET’, ‘POST’]) def lobby(): if request.method == ‘POST’: user1 = session.get(‘username’) user2 = request.form.get(‘user_id’) # 방 이름을 알파벳순으로 고정해 충돌 방지 room_users = sorted([user1, user2]) room_name = f”{room_users[0]}_{room_users[1]}” return redirect(url_for(‘private_chat’, room_name=room_name)) else: query = text(“"”SELECT id FROM USERINFO”””) with database.connect() as conn: result = conn.execute(query).fetchall() users = [] for row in result: users.append(row[0]) uname = session.get(‘username’) if not uname: return render_template(‘error.html’, alert_msg=”로그인이 필요합니다”) return render_template(‘chat.html’, users=users, username=session[‘username’])
@app.route(‘/chat/
user = session.get('username')
if not user:
return redirect(url_for('login'))
# 사용자가 방에 속하지 않으면 접근 금지
if user not in room_name:
return redirect(url_for('index'))
# 방 이름(room_name)에서 나(me)와 상대방(other)을 분리
users = room_name.split("_")
if users[0] == user:
other_user = users[1]
else:
other_user = users[0]
# 상대방의 공개키(pub_key) 조회
pub_query = text("SELECT pub_key FROM USERINFO WHERE id = :id")
with database.connect() as conn:
pub_result = conn.execute(pub_query, {"id": other_user}).fetchone()
if not pub_result:
return "상대방 공개키를 찾을 수 없습니다.", 500
recipient_pub_key = pub_result[0]
# 내 개인키(pri_key) 조회
pri_query = text("SELECT pri_key FROM `PRIVATE` WHERE id = :id")
with database.connect() as conn:
pri_result = conn.execute(pri_query, {"id": user}).fetchone()
if not pri_result:
return "내 개인키를 찾을 수 없습니다.", 500
my_private_key = pri_result[0]
return render_template(
'private_chat.html',
room=room_name,
username=user,
chats=chats,
recipient_pub_key=recipient_pub_key,
my_private_key=my_private_key
)
@socketio.on(‘join’) def join(room_name): username = session.get(‘username’) print(f”{username}님이 {room_name} 방에 입장”) if username in room_name: join_room(room_name)
@socketio.on(‘private_message’) def handle_message(data): “”” 클라이언트에서 전송된 데이터를 통해: - encrypted_message: AES-256-CBC로 암호화된 메시지 문자열 (IV:CipherText 형태) - encrypted_key: RSA로 암호화된 AES 대칭키(Base64 문자열) - room: 방 이름 - sender: 보내는 사용자의 ID “”” print(f”[private_message] data received: {data}”) room_name = data.get(“room”) encrypted_message = data.get(“encrypted_message”) encrypted_key = data.get(“encrypted_key”) username = data.get(“sender”)
if not all([room_name, encrypted_message, encrypted_key, username]):
print("필수 데이터 누락")
return
query = text("""
INSERT INTO CHAT(chat, aes_key, room, sender)
VALUES (:chat, :aes_key, :room, :sender)
""")
with database.begin() as conn:
conn.execute(query, {
"chat": encrypted_message,
"aes_key": encrypted_key,
"room": room_name,
"sender": username
})
emit('private_message', {
'sender': username,
'encrypted_message': encrypted_message,
'encrypted_key': encrypted_key
}, to=room_name)
if name == ‘main’: socketio.run(app, host=’0.0.0.0’, port=5000, debug=True)
```
데이터베이스 구조
USERINFO: id, name, pub_key(공개키)
PRIVATE: id, passwd(해시 비번), pri_key(개인키)
CHAT: sender, chat(암호문), aes_key(암호화된 대칭키), room
ACHAT: 공개 채팅 메시지 저장.
보안상 특징과 한계
장점
종단간 암호화: 서버가 메시지 내용을 복호화할 수 없음
비밀번호 해시: bcrypt로 안전하게 저장
한계 및 개선점
RSA 1024비트: 현대 표준 미달, 2048비트 이상 권장
세션 쿠키: 평문 저장, Secure/HttpOnly/SameSite 옵션 필수
TLS 미적용: 네트워크 감청, 세션 하이재킹 위험
XSS 방어 미흡: 쿠키 탈취 가능성