| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- #!/usr/bin/env python3
- """
- 清理 OSS 存储桶中的旧文件
- """
- import boto3
- from botocore.config import Config
- import sys
- # OSS 配置
- OSS_CONFIG = {
- 'endpoint_url': 'http://58.213.48.57:15900',
- 'access_key_id': 'wvp',
- 'secret_access_key': '6MnZFxZxRwbvS01khA9ldiawJuc9mytyiq2kEv3k',
- 'bucket_name': 'wvp',
- 'path_prefix': 'device',
- }
- def get_s3_client():
- """创建 S3 客户端"""
- config = Config(
- s3={'addressing_style': 'path'},
- signature_version='s3v4'
- )
- return boto3.client(
- 's3',
- endpoint_url=OSS_CONFIG['endpoint_url'],
- aws_access_key_id=OSS_CONFIG['access_key_id'],
- aws_secret_access_key=OSS_CONFIG['secret_access_key'],
- region_name='us-east-1',
- config=config
- )
- def list_objects(s3_client, prefix=''):
- """列出对象"""
- objects = []
- continuation_token = None
- while True:
- params = {
- 'Bucket': OSS_CONFIG['bucket_name'],
- 'Prefix': prefix
- }
- if continuation_token:
- params['ContinuationToken'] = continuation_token
- response = s3_client.list_objects_v2(**params)
- if 'Contents' in response:
- for obj in response['Contents']:
- objects.append(obj['Key'])
- if response.get('IsTruncated'):
- continuation_token = response.get('NextContinuationToken')
- else:
- break
- return objects
- def delete_objects(s3_client, keys):
- """删除对象"""
- if not keys:
- print("没有需要删除的对象")
- return 0
- # 批量删除(每次最多1000个)
- deleted = 0
- use_batch = True
- for i in range(0, len(keys), 1000):
- batch = keys[i:i+1000]
- if use_batch:
- try:
- objects_to_delete = [{'Key': key} for key in batch]
- s3_client.delete_objects(
- Bucket=OSS_CONFIG['bucket_name'],
- Delete={'Objects': objects_to_delete, 'Quiet': True}
- )
- deleted += len(batch)
- print(f"已删除 {len(batch)} 个文件 (总计 {i+len(batch)}/{len(keys)})")
- continue
- except Exception as e:
- print(f"批量删除不支持,降级为逐个删除: {e}")
- use_batch = False
- for key in batch:
- try:
- s3_client.delete_object(Bucket=OSS_CONFIG['bucket_name'], Key=key)
- deleted += 1
- if deleted % 100 == 0 or deleted == len(keys):
- print(f" 进度: {deleted}/{len(keys)}")
- except Exception as e2:
- print(f" 删除 {key} 失败: {e2}")
- return deleted
- def cleanup_old_files(days=21, auto_confirm=False):
- """清理旧文件
- Args:
- days: 保留最近N天的文件,删除更早的
- auto_confirm: 是否自动确认
- """
- import time
- from datetime import datetime, timedelta
- s3_client = get_s3_client()
- # 计算截止时间
- cutoff_time = time.time() - (days * 24 * 3600)
- cutoff_date = datetime.fromtimestamp(cutoff_time).strftime('%Y-%m-%d')
- print(f"将删除 {cutoff_date} 之前的文件...")
- # 列出所有对象
- prefix = OSS_CONFIG['path_prefix'] + '/'
- print(f"正在列出 {OSS_CONFIG['bucket_name']}/{prefix} 下的对象...")
- all_objects = list_objects(s3_client, prefix)
- print(f"共找到 {len(all_objects)} 个对象")
- if not all_objects:
- print("没有对象需要删除")
- return 0
- # 筛选旧文件
- old_objects = []
- for key in all_objects:
- # 从 key 中提取日期 (格式: device/20260520/...)
- parts = key.split('/')
- if len(parts) >= 2 and parts[1]:
- try:
- # 尝试解析日期
- date_str = parts[1] # 如 20260520
- file_date = datetime.strptime(date_str, '%Y%m%d')
- file_timestamp = file_date.timestamp()
- if file_timestamp < cutoff_time:
- old_objects.append(key)
- except:
- pass
- print(f"将删除 {len(old_objects)} 个旧文件")
- if old_objects:
- # 显示前10个
- print("前10个将删除的文件:")
- for key in old_objects[:10]:
- print(f" - {key}")
- if len(old_objects) > 10:
- print(f" ... 还有 {len(old_objects) - 10} 个")
- # 确认
- if not auto_confirm:
- confirm = input(f"\n确认删除 {len(old_objects)} 个文件? (y/n): ")
- if confirm.lower() != 'y':
- print("已取消")
- return 0
- else:
- print(f"\n自动确认删除 {len(old_objects)} 个文件...")
- deleted = delete_objects(s3_client, old_objects)
- print(f"\n完成! 共删除 {deleted} 个文件")
- return deleted
- else:
- print("没有旧文件需要删除")
- return 0
- def clear_all():
- """清空所有文件"""
- s3_client = get_s3_client()
- prefix = OSS_CONFIG['path_prefix'] + '/'
- all_objects = list_objects(s3_client, prefix)
- print(f"存储桶中共有 {len(all_objects)} 个对象")
- if not all_objects:
- print("没有对象需要删除")
- return 0
- # 显示前10个
- print("前10个对象:")
- for key in all_objects[:10]:
- print(f" - {key}")
- if len(all_objects) > 10:
- print(f" ... 还有 {len(all_objects) - 10} 个")
- # 确认
- confirm = input(f"\n警告: 将删除 ALL {len(all_objects)} 个文件! 确认? (y/n): ")
- if confirm.lower() != 'y':
- print("已取消")
- return 0
- deleted = delete_objects(s3_client, all_objects)
- print(f"\n完成! 共删除 {deleted} 个文件")
- return deleted
- def main():
- if len(sys.argv) < 2:
- print("用法:")
- print(" python cleanup_oss.py cleanup [days] [--yes] - 清理N天前的旧文件")
- print(" python cleanup_oss.py clear [--yes] - 清空所有文件")
- print(" python cleanup_oss.py list - 列出所有文件")
- sys.exit(1)
- command = sys.argv[1]
- auto_confirm = '--yes' in sys.argv
- if command == 'cleanup':
- days = 7
- for arg in sys.argv[2:]:
- if arg.isdigit():
- days = int(arg)
- cleanup_old_files(days, auto_confirm)
- elif command == 'clear':
- if not auto_confirm:
- confirm = input("确定要清空所有文件吗? 此操作不可恢复! (输入 'yes' 确认): ")
- if confirm != 'yes':
- print("已取消")
- return
- clear_all()
- elif command == 'list':
- s3_client = get_s3_client()
- prefix = OSS_CONFIG['path_prefix'] + '/'
- objects = list_objects(s3_client, prefix)
- print(f"共 {len(objects)} 个对象:")
- for obj in objects[:50]:
- print(f" - {obj}")
- if len(objects) > 50:
- print(f" ... 还有 {len(objects) - 50} 个")
- else:
- print(f"未知命令: {command}")
- sys.exit(1)
- if __name__ == '__main__':
- main()
|