首页下载资源后端file-download sftp

ZIPfile-download sftp

lavine081.29MB需要积分:1

资源文件列表:

file_download 大约有9个文件
  1. file_download/
  2. file_download/config.ini 1.29KB
  3. file_download/DEFECT.json 3.86KB
  4. file_download/file_download.py 55.03KB
  5. file_download/sftp需要安装的python依赖/
  6. file_download/sftp需要安装的python依赖/bcrypt-4.2.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl 267.08KB
  7. file_download/sftp需要安装的python依赖/paramiko-3.4.1-py3-none-any.whl 220.92KB
  8. file_download/sftp需要安装的python依赖/PyNaCl-1.5.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl 836.6KB
  9. file_download/t_biz_file_info.sql 6.14KB

资源介绍:

file-download sftp
#!/usr/bin/env python # encoding: utf-8 import argparse import configparser import ftplib import hashlib import json import logging import os import re import threading import time import traceback from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from enum import Enum from pathlib import PurePosixPath from posixpath import dirname from queue import Queue, Empty import paramiko import pymysql logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class Enums(Enum): # 文件类型 PATH_TYPE_UNKNOWN = ("UNKOWN", 0) PATH_TYPE_FILE = ("FILE", 1) PATH_TYPE_DIR = ("DIR", 2) # 下载方式 DOWNLOAD_MODE_FULL = ("FULL", 0) DOWNLOAD_MODE_INCREMENT = ("INCRE", 1) # 文件操作 OPERATIONS_DEL = ("DEL", 0) OPERATIONS_MV = ("MV", 1) # 数据类型 DATA_TYPE_CP = ("CP", 2) DATA_TYPE_CP_SP = ("CP_SP", 20) DATA_TYPE_FT = ("FT", 3) DATA_TYPE_FT_SP = ("FT_SP", 30) DATA_TYPE_DEFECT = ("DEFECT", 5) # 文件状态 QUEUE = ("下载成功,在解析队列中", 1) TRASH_FILE = ("垃圾文件", 14) DOWNLOAD_FAIL = ("下载失败", 15) @property def DESC(self): return self._value_[0] @property def CODE(self): return self._value_[1] @classmethod def get_code_by_desc(cls, description): for member in cls: if str(member.DESC).strip().upper() == str(description).strip().upper(): return member.CODE raise ValueError(f"No member with description '{description}' found") class ConfigLoader: """ 加载配置文件内容 """ def __init__(self, config_file_path): self.config_file_path = config_file_path self.config = configparser.ConfigParser() self.config.read(config_file_path, encoding="UTF-8") self.starrocks_host = self.config.get("starrocks", 'host') self.starrocks_port = self.config.get("starrocks", 'port') self.starrocks_http_port = self.config.get("starrocks", 'http_port') self.starrocks_database = self.config.get("starrocks", 'database') self.starrocks_user = self.config.get("starrocks", 'user') self.starrocks_password = self.config.get("starrocks", 'password') self.mysql_host = self.config.get("mysql", 'host') self.mysql_port = self.config.get("mysql", 'port') self.mysql_database = self.config.get("mysql", 'database') self.mysql_user = self.config.get("mysql", 'user') self.mysql_password = self.config.get("mysql", 'password') self.encode_list = str(self.config.get("common", 'encode_list')).split(",") self.defect_white_list = str(self.config.get("common", 'defect_white_list')) self.defect_black_list = str(self.config.get("common", 'defect_black_list')) self.defect_trash_file_delete = str(self.config.get("common", 'defect_trash_file_delete')) self.pool_size = int(self.config.get("common", 'pool_size')) def get(self, section, option): try: value = self.config.get(section, option) return value except (configparser.NoSectionError, configparser.NoOptionError): return None def set_value(self, section, option, value): if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, option, value) def save(self): with open(self.config_file_path, 'w') as configfile: self.config.write(configfile) class Tools: def generator_id(self, string_str): """ 计算字符串ID :param string_str: 字符串 :return: """ return abs(int.from_bytes(bytes.fromhex(hashlib.md5(str(string_str).encode()).hexdigest()[-16:]), 'little', signed=True)) def calculate_md5(self, file_abs_path): """ 计算文件 md5 :param file_abs_path: 文件绝对路径 :return: """ hash_md5 = hashlib.md5() with open(file_abs_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def modify_file_name(self, lpath, md5): """ 修改所有文件名.可以根据需求自定义修改 :param lpath: :param md5: :return: """ file_name = os.path.basename(lpath) ldir = os.path.dirname(lpath) new_file_name = md5 + "::" + file_name new_lpath = os.path.join(ldir, new_file_name).replace("\\", '/') # 修改文件名 if os.path.exists(new_lpath): os.remove(new_lpath) os.rename(lpath, new_lpath) logging.info(f"Renamed: {lpath} -> {new_lpath}") return new_lpath def delete_file(self, file_abs_path): """ 判断指定路径的文件是否存在且是文件,若满足条件则删除该文件 :param file_abs_path: 要判断并删除的文件路径 :return: 无 """ if os.path.exists(file_abs_path) and os.path.isfile(file_abs_path): try: os.remove(file_abs_path) logging.warning(f"{file_abs_path}是垃圾文件, 已成功删除。") except FileNotFoundError: logging.info(f"{file_abs_path} 不存在,可能已被其他操作删除。") except PermissionError: logging.info(f"没有权限删除 {file_abs_path}。") else: logging.info(f"{file_abs_path} 不存在或不是一个文件。") def generator_file_list(self, backup_name, backup_path, bak_ftp_id, data_type, download_duration, download_end_time, download_start_time, download_status, file_list, instance_id, md5, mtime, new_lpath, rpath, size, src_ftp_id, config, tools): # 如果数据类型是cp_sp/ft_sp[cp_sp/ft_sp在一个工作流中,cp/sp数据都要录入] if str(data_type).strip().upper() == Enums.DATA_TYPE_CP_SP.DESC: # cp的录入一份 file_list.append([str(PurePosixPath(new_lpath).name), self.generator_id(str(PurePosixPath(rpath).name) + "::" + md5), Enums.DATA_TYPE_CP.CODE, 'data_source', mtime, size, md5, str(PurePosixPath(new_lpath).parent), str(PurePosixPath(rpath).parent), src_ftp_id, backup_name, backup_path, bak_ftp_id, download_duration, download_start_time, download_end_time, download_status, download_end_time, instance_id ]) # cp_sp的录入一份 file_list.append([str(PurePosixPath(new_lpath).name), self.generator_id(str(PurePosixPath(rpath).name) + "::" + md5), Enums.DATA_TYPE_CP_SP.CODE, 'data_source', mtime, size, md5, str(PurePosixPath(new_lpath).parent), str(PurePosixPath(rpath).parent), src_ftp_id, backup_name, backup_path, bak_ftp_id, download_duration, download_start_time,
100+评论
captcha