mirror of
https://github.com/BlueSkyXN/WorkerJS_CloudFlare_ImageBed.git
synced 2024-11-16 03:32:26 +08:00
0.9.17
2个ipfs api 示例
This commit is contained in:
parent
7fb7a7a914
commit
81a1226b7f
89
python-uploader/ipfs-filebase.py
Normal file
89
python-uploader/ipfs-filebase.py
Normal file
|
@ -0,0 +1,89 @@
|
|||
import sys
|
||||
import os
|
||||
import boto3
|
||||
from botocore.client import Config
|
||||
from botocore.exceptions import ClientError
|
||||
from datetime import datetime
|
||||
|
||||
# Filebase的S3兼容的API端点和凭据
|
||||
endpoint_url = 'https://s3.filebase.com'
|
||||
access_key = ''
|
||||
secret_key = ''
|
||||
|
||||
# 获取命令行参数
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: python upload_to_filebase.py <local_file_path>")
|
||||
sys.exit(1)
|
||||
|
||||
local_file_path = sys.argv[1]
|
||||
# Filebase的S3兼容存储桶名称
|
||||
bucket_name = ''
|
||||
|
||||
# 使用 os.path.basename() 只提取文件名
|
||||
file_name, file_extension = os.path.splitext(os.path.basename(local_file_path))
|
||||
|
||||
# 生成当前时间戳,格式为:年-月-日_时-分-秒
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
|
||||
# 使用文件名 + 时间戳 作为 S3 文件名
|
||||
s3_file_key = f"{file_name}_{timestamp}{file_extension}"
|
||||
|
||||
# 创建S3客户端,增加超时设置
|
||||
try:
|
||||
print("Creating S3 client...")
|
||||
s3 = boto3.client(
|
||||
's3',
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
endpoint_url=endpoint_url,
|
||||
config=Config(signature_version='s3v4', connect_timeout=10, read_timeout=10)
|
||||
)
|
||||
print("S3 client created successfully.")
|
||||
except Exception as e:
|
||||
print(f"Error creating S3 client: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# 上传文件
|
||||
def upload_file_to_s3():
|
||||
try:
|
||||
print(f"Starting to upload '{local_file_path}' to bucket '{bucket_name}'...")
|
||||
s3.upload_file(local_file_path, bucket_name, s3_file_key)
|
||||
print(f"File '{local_file_path}' successfully uploaded to '{bucket_name}/{s3_file_key}'")
|
||||
except Exception as e:
|
||||
print(f"Error uploading file: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# 获取文件的元数据,包括CID
|
||||
def get_file_metadata(bucket_name, s3_file_key):
|
||||
try:
|
||||
print(f"Fetching metadata for file '{s3_file_key}' in bucket '{bucket_name}'...")
|
||||
response = s3.head_object(Bucket=bucket_name, Key=s3_file_key)
|
||||
metadata = response.get('Metadata', {})
|
||||
|
||||
if metadata:
|
||||
print("File metadata:", metadata)
|
||||
cid = metadata.get('cid', None) # 假设 CID 被存储为元数据的一部分
|
||||
if cid:
|
||||
print(f"CID: {cid}")
|
||||
else:
|
||||
print("CID not found in metadata.")
|
||||
else:
|
||||
print("No custom metadata found for the file.")
|
||||
except ClientError as e:
|
||||
print(f"Error fetching metadata: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# 测试连接是否正常
|
||||
def test_s3_connection():
|
||||
try:
|
||||
print("Testing S3 connection by listing buckets...")
|
||||
s3.list_buckets()
|
||||
print("S3 connection successful.")
|
||||
except Exception as e:
|
||||
print(f"Error in connecting to S3: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# 执行上传文件和获取CID的流程
|
||||
test_s3_connection() # 首先测试连接
|
||||
upload_file_to_s3() # 上传文件
|
||||
get_file_metadata(bucket_name, s3_file_key) # 获取元数据
|
93
python-uploader/ipfs-pinata.py
Normal file
93
python-uploader/ipfs-pinata.py
Normal file
|
@ -0,0 +1,93 @@
|
|||
import requests
|
||||
import json
|
||||
import argparse
|
||||
import os
|
||||
import mimetypes
|
||||
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
||||
|
||||
# 硬编码JWT令牌
|
||||
JWT = ""
|
||||
|
||||
def pin_file_to_ipfs(file_path):
|
||||
# 解析绝对路径
|
||||
file_path = os.path.abspath(file_path)
|
||||
|
||||
if not os.path.isfile(file_path):
|
||||
print(f"错误: 文件 '{file_path}' 不存在或不是一个文件。")
|
||||
return
|
||||
|
||||
# 获取文件名
|
||||
file_name = os.path.basename(file_path)
|
||||
|
||||
# 手动添加对较新格式的 MIME 类型支持(如 .avif)
|
||||
mimetypes.add_type('image/avif', '.avif')
|
||||
mimetypes.add_type('video/webm', '.webm') # 其他可能需要的类型
|
||||
|
||||
# 使用 mimetypes 通过扩展名识别 MIME 类型
|
||||
mime_type, _ = mimetypes.guess_type(file_path)
|
||||
|
||||
# 如果 MIME 类型无法识别,设置为'application/octet-stream'
|
||||
if mime_type is None:
|
||||
mime_type = 'application/octet-stream'
|
||||
|
||||
try:
|
||||
with open(file_path, 'rb') as file:
|
||||
# 使用 multipart form data 传递文件和 MIME 类型
|
||||
form_data = MultipartEncoder(
|
||||
fields={
|
||||
'file': (file_name, file, mime_type),
|
||||
'pinataMetadata': json.dumps({
|
||||
'name': file_name
|
||||
}),
|
||||
'pinataOptions': json.dumps({
|
||||
'cidVersion': 0
|
||||
})
|
||||
}
|
||||
)
|
||||
|
||||
# 设置请求头,确保 JWT 和 Content-Type 正确传递
|
||||
headers = {
|
||||
'Authorization': f'Bearer {JWT}',
|
||||
'Content-Type': form_data.content_type
|
||||
}
|
||||
|
||||
print(f"正在上传文件 '{file_name}',MIME类型: {mime_type}...")
|
||||
|
||||
# 向 Pinata 发起 POST 请求上传文件
|
||||
response = requests.post(
|
||||
"https://api.pinata.cloud/pinning/pinFileToIPFS",
|
||||
data=form_data,
|
||||
headers=headers
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
response_json = response.json()
|
||||
ipfs_hash = response_json["IpfsHash"]
|
||||
ipfs_url = f"https://gateway.pinata.cloud/ipfs/{ipfs_hash}"
|
||||
|
||||
print("文件成功上传至IPFS!")
|
||||
print(json.dumps(response_json, indent=4))
|
||||
print(f"可访问的IPFS URL: {ipfs_url}")
|
||||
else:
|
||||
print(f"上传失败。状态码: {response.status_code}")
|
||||
print(f"响应内容: {response.text}")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"请求错误: {e}")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {e}")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="将文件上传到Pinata的IPFS。")
|
||||
parser.add_argument('file_path', type=str, help='要上传的文件的路径')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# 处理中文路径和空格,确保编码正确
|
||||
try:
|
||||
pin_file_to_ipfs(args.file_path)
|
||||
except Exception as e:
|
||||
print(f"上传时发生错误: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user