Branding, UI Design IT, 메신저, 프로젝트 / 2025-05-14 / by 홍재모

암호화 메신저 프로젝트

Flask 기반 종단간 암호화 메신저 코드 분석 및 설명

Flask와 Python을 이용해 구현된 종단간 암호화(E2EE) 메신저의 전체 구조에 대해 설명하겠습니다.

프로젝트 개요

이 메신저는 Flask 웹 프레임워크와 Flask-SocketIO를 기반으로 하며, RSA와 AES 알고리즘을 조합한 종단간 암호화를 지원합니다. 사용자 인증, 1:1 비밀 채팅, 공개 채팅, 회원가입 등 기본적인 메신저 기능을 모두 갖추고 있습니다

사용된 파이썬 라이브러리 및 환경

Flask: 웹 서버 및 라우팅

Flask-SocketIO: 실시간 채팅 구현

PyMySQL, SQLAlchemy: MySQL 데이터베이스 연동

bcrypt: 비밀번호 해시 처리

PyCryptodome: RSA, AES 암호화/복호화

Jinja2: 템플릿 렌더링

사용된 서버 코드

```import socket import threading import pymysql import bcrypt from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_OAEP from flask import Flask, request, render_template, redirect, session, url_for from sqlalchemy import create_engine, text from flask_socketio import SocketIO, send, join_room, emit

app = Flask(name) app.config.from_pyfile(‘config.py’) socketio = SocketIO(app, manage_session=True)

database = create_engine(app.config[‘DB_URL’], pool_pre_ping=True, pool_recycle=3600, echo=True)

def encrypt_message(message, public_key): # 메시지를 공개키로 암호화 cipher = PKCS1_OAEP.new(public_key) return cipher.encrypt(message.encode())

def decrypt_message(encrypted_message, private_key): # 암호화된 메시지를 개인키로 복호화 cipher = PKCS1_OAEP.new(private_key) return cipher.decrypt(encrypted_message).decode()

def make_key(): # RSA 공개키 및 개인키 생성 pr_key = RSA.generate(1024) pu_key = pr_key.public_key() return pr_key, pu_key

def get_login(user_id): # :id는 플레이스 홀더 역할로 값을 넣을 자리를 표시 with database.connect() as conn: query = text(“SELECT id,passwd FROM PRIVATE WHERE id = :id”) result = conn.execute(query, {“id”: user_id}).fetchone() if result: return {“id”: result[0], “passwd”: result[1]} else: return

def get_duplication(user_id): query = text(“SELECT id FROM USERINFO WHERE id = :id”) with database.connect() as conn: result = conn.execute(query, {“id”: user_id}).fetchone() if result: return {“id”: result[0]} else: return

@app.route(‘/’) def index(): return render_template(‘index.html’)

@app.route(‘/login’, methods=[‘GET’, ‘POST’]) def sign_in(): if ‘username’ in session: return redirect(‘/’) if request.method == ‘POST’: user_id = request.form.get(“user_id”) password = request.form.get(“password”) userinfo = get_login(user_id) encode_pw = password.encode(“utf-8”) if userinfo: encode_ckpw = userinfo[“passwd”] if bcrypt.checkpw(encode_pw, encode_ckpw): session[“username”] = user_id return redirect(‘/’) else: result = “아이디 혹은 비밀번호가 다릅니다.” return render_template(‘login_result.html’, result=result) else: result = “아이디 혹은 비밀번호가 다릅니다.” return render_template(‘login_result.html’, result=result) return render_template(‘login.html’)

@app.route(‘/logout’) def logout(): session.pop(‘username’, None) return redirect(‘/’)

@app.route(‘/register’, methods=[‘GET’, ‘POST’]) def sign_up(): if request.method == ‘POST’: user_name = request.form.get(“user_name”) user_id = request.form.get(“user_id”) password = request.form.get(“password”) b_password = bytes(password, “utf-8”) b_hashed_password = bcrypt.hashpw(password=b_password, salt=bcrypt.gensalt())

    duplication = get_duplication(user_id)
    if duplication:
        return "사용할 수 없는 아이디입니다."

    private_key, public_key = make_key()
    pri_str = private_key.export_key().decode()
    pub_str = public_key.export_key().decode()

    try:
        uquery = text("""INSERT INTO USERINFO(name,id,pub_key) VALUES (:name,:id,:pub_key)""")
        with database.connect() as conn:
            conn.execute(uquery, {"name": user_name, "id": user_id, "pub_key": pub_str})
            conn.commit()
        kquery = text("""INSERT INTO PRIVATE(id,passwd,pri_key) VALUES (:id,:passwd,:pri_key)""")
        with database.connect() as conn:
            conn.execute(kquery, {"id": user_id, "passwd": b_hashed_password, "pri_key": pri_str})
            conn.commit()

        result = "회원가입 성공"
        return render_template('register_result.html', result=result)
    except Exception as e:
        print("에러 발생:", e)
        result = "회원가입 실패"
        return render_template('register_result.html', result=result)
return render_template('register.html')

@app.route(‘/openchat’) def open_chat(): query = text(“"”SELECT chat FROM ACHAT ORDER BY num ASC”””) with database.connect() as conn: result = conn.execute(query).fetchall() chats = [{“chat”: row[0]} for row in result] return render_template(‘webchat.html’, chats=chats)

@socketio.on(‘open_message’) def handle_message(msg): query = text(“"”INSERT INTO ACHAT(chat) VALUES(:chat)”””) with database.connect() as conn: conn.execute(query, {“chat”: msg}) conn.commit() send(msg, broadcast=True)

@app.route(‘/chat’, methods=[‘GET’, ‘POST’]) def lobby(): if request.method == ‘POST’: user1 = session.get(‘username’) user2 = request.form.get(‘user_id’) # 방 이름을 알파벳순으로 고정해 충돌 방지 room_users = sorted([user1, user2]) room_name = f”{room_users[0]}_{room_users[1]}” return redirect(url_for(‘private_chat’, room_name=room_name)) else: query = text(“"”SELECT id FROM USERINFO”””) with database.connect() as conn: result = conn.execute(query).fetchall() users = [] for row in result: users.append(row[0]) uname = session.get(‘username’) if not uname: return render_template(‘error.html’, alert_msg=”로그인이 필요합니다”) return render_template(‘chat.html’, users=users, username=session[‘username’])

@app.route(‘/chat/') def private_chat(room_name): # 방(room_name) 내부의 기존 암호문 (chat, aes_key)을 모두 가져옴 query = text("SELECT sender, chat, aes_key FROM CHAT WHERE room = :room ORDER BY id ASC") with database.connect() as conn: result = conn.execute(query, {"room": room_name}).fetchall() chats = [ {"sender": row[0], "chat": row[1], "aes_key": row[2]} for row in result ]

user = session.get('username')
if not user:
    return redirect(url_for('login'))
# 사용자가 방에 속하지 않으면 접근 금지
if user not in room_name:
    return redirect(url_for('index'))

# 방 이름(room_name)에서 나(me)와 상대방(other)을 분리
users = room_name.split("_")
if users[0] == user:
    other_user = users[1]
else:
    other_user = users[0]

# 상대방의 공개키(pub_key) 조회
pub_query = text("SELECT pub_key FROM USERINFO WHERE id = :id")
with database.connect() as conn:
    pub_result = conn.execute(pub_query, {"id": other_user}).fetchone()
    if not pub_result:
        return "상대방 공개키를 찾을 수 없습니다.", 500
    recipient_pub_key = pub_result[0]

# 내 개인키(pri_key) 조회
pri_query = text("SELECT pri_key FROM `PRIVATE` WHERE id = :id")
with database.connect() as conn:
    pri_result = conn.execute(pri_query, {"id": user}).fetchone()
    if not pri_result:
        return "내 개인키를 찾을 수 없습니다.", 500
    my_private_key = pri_result[0]

return render_template(
    'private_chat.html',
    room=room_name,
    username=user,
    chats=chats,
    recipient_pub_key=recipient_pub_key,
    my_private_key=my_private_key
)

@socketio.on(‘join’) def join(room_name): username = session.get(‘username’) print(f”{username}님이 {room_name} 방에 입장”) if username in room_name: join_room(room_name)

@socketio.on(‘private_message’) def handle_message(data): “”” 클라이언트에서 전송된 데이터를 통해: - encrypted_message: AES-256-CBC로 암호화된 메시지 문자열 (IV:CipherText 형태) - encrypted_key: RSA로 암호화된 AES 대칭키(Base64 문자열) - room: 방 이름 - sender: 보내는 사용자의 ID “”” print(f”[private_message] data received: {data}”) room_name = data.get(“room”) encrypted_message = data.get(“encrypted_message”) encrypted_key = data.get(“encrypted_key”) username = data.get(“sender”)

if not all([room_name, encrypted_message, encrypted_key, username]):
    print("필수 데이터 누락")
    return

query = text("""
    INSERT INTO CHAT(chat, aes_key, room, sender)
    VALUES (:chat, :aes_key, :room, :sender)
""")
with database.begin() as conn:
    conn.execute(query, {
        "chat": encrypted_message,
        "aes_key": encrypted_key,
        "room": room_name,
        "sender": username
    })

emit('private_message', {
    'sender': username,
    'encrypted_message': encrypted_message,
    'encrypted_key': encrypted_key
}, to=room_name)

if name == ‘main’: socketio.run(app, host=’0.0.0.0’, port=5000, debug=True)

```

데이터베이스 구조

USERINFO: id, name, pub_key(공개키)

PRIVATE: id, passwd(해시 비번), pri_key(개인키)

CHAT: sender, chat(암호문), aes_key(암호화된 대칭키), room

ACHAT: 공개 채팅 메시지 저장.

보안상 특징과 한계

장점

종단간 암호화: 서버가 메시지 내용을 복호화할 수 없음

비밀번호 해시: bcrypt로 안전하게 저장

한계 및 개선점

RSA 1024비트: 현대 표준 미달, 2048비트 이상 권장

세션 쿠키: 평문 저장, Secure/HttpOnly/SameSite 옵션 필수

TLS 미적용: 네트워크 감청, 세션 하이재킹 위험

XSS 방어 미흡: 쿠키 탈취 가능성

Tags:
Comments