2023-11-13 22:05:46 +08:00
|
|
|
import base64
|
|
|
|
import hashlib
|
|
|
|
import hmac
|
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import time
|
|
|
|
from typing import Optional
|
|
|
|
|
2024-01-12 12:34:01 +08:00
|
|
|
from flask import current_app
|
2023-11-13 22:05:46 +08:00
|
|
|
|
2024-02-06 13:21:13 +08:00
|
|
|
from extensions.ext_storage import storage
|
|
|
|
|
2024-01-21 16:58:06 +08:00
|
|
|
IMAGE_EXTENSIONS = ['jpg', 'jpeg', 'png', 'webp', 'gif', 'svg']
|
|
|
|
IMAGE_EXTENSIONS.extend([ext.upper() for ext in IMAGE_EXTENSIONS])
|
2023-11-13 22:05:46 +08:00
|
|
|
|
2024-04-08 18:51:46 +08:00
|
|
|
|
2023-11-13 22:05:46 +08:00
|
|
|
class UploadFileParser:
|
|
|
|
@classmethod
|
|
|
|
def get_image_data(cls, upload_file, force_url: bool = False) -> Optional[str]:
|
|
|
|
if not upload_file:
|
|
|
|
return None
|
|
|
|
|
2024-01-21 16:58:06 +08:00
|
|
|
if upload_file.extension not in IMAGE_EXTENSIONS:
|
2023-11-13 22:05:46 +08:00
|
|
|
return None
|
|
|
|
|
|
|
|
if current_app.config['MULTIMODAL_SEND_IMAGE_FORMAT'] == 'url' or force_url:
|
2024-04-08 18:51:46 +08:00
|
|
|
return cls.get_signed_temp_image_url(upload_file.id)
|
2023-11-13 22:05:46 +08:00
|
|
|
else:
|
|
|
|
# get image file base64
|
|
|
|
try:
|
|
|
|
data = storage.load(upload_file.key)
|
|
|
|
except FileNotFoundError:
|
|
|
|
logging.error(f'File not found: {upload_file.key}')
|
|
|
|
return None
|
|
|
|
|
|
|
|
encoded_string = base64.b64encode(data).decode('utf-8')
|
|
|
|
return f'data:{upload_file.mime_type};base64,{encoded_string}'
|
|
|
|
|
|
|
|
@classmethod
|
2024-04-08 18:51:46 +08:00
|
|
|
def get_signed_temp_image_url(cls, upload_file_id) -> str:
|
2023-11-13 22:05:46 +08:00
|
|
|
"""
|
|
|
|
get signed url from upload file
|
|
|
|
|
|
|
|
:param upload_file: UploadFile object
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
base_url = current_app.config.get('FILES_URL')
|
2024-04-08 18:51:46 +08:00
|
|
|
image_preview_url = f'{base_url}/files/{upload_file_id}/image-preview'
|
2023-11-13 22:05:46 +08:00
|
|
|
|
|
|
|
timestamp = str(int(time.time()))
|
|
|
|
nonce = os.urandom(16).hex()
|
2024-04-08 18:51:46 +08:00
|
|
|
data_to_sign = f"image-preview|{upload_file_id}|{timestamp}|{nonce}"
|
2023-11-13 22:05:46 +08:00
|
|
|
secret_key = current_app.config['SECRET_KEY'].encode()
|
|
|
|
sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
|
|
|
|
encoded_sign = base64.urlsafe_b64encode(sign).decode()
|
|
|
|
|
|
|
|
return f"{image_preview_url}?timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}"
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def verify_image_file_signature(cls, upload_file_id: str, timestamp: str, nonce: str, sign: str) -> bool:
|
|
|
|
"""
|
|
|
|
verify signature
|
|
|
|
|
|
|
|
:param upload_file_id: file id
|
|
|
|
:param timestamp: timestamp
|
|
|
|
:param nonce: nonce
|
|
|
|
:param sign: signature
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
data_to_sign = f"image-preview|{upload_file_id}|{timestamp}|{nonce}"
|
|
|
|
secret_key = current_app.config['SECRET_KEY'].encode()
|
|
|
|
recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
|
|
|
|
recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode()
|
|
|
|
|
|
|
|
# verify signature
|
|
|
|
if sign != recalculated_encoded_sign:
|
|
|
|
return False
|
|
|
|
|
|
|
|
current_time = int(time.time())
|
2024-05-27 22:20:49 +08:00
|
|
|
return current_time - int(timestamp) <= current_app.config.get('FILES_ACCESS_TIMEOUT')
|