Python 运维作业2——作业2(FTP+SMTP+日志)

更新地址:

  1. https://cmd.dayi.ink/jQKDT6i0QMSY8betlwVmzw
  2. https://type.dayiyi.top/index.php/archives/316/

要求

在当前路径下创建文件夹,并用自己的姓名命名,该文件夹下有若干中文命名的文件和文件夹。

编程实现如下功能:

(1)编写FTP程序,设置用户名、密码、上传下载速率限制,家目录为姓名命名的目录。用户具有更改目录、删除文件或目录、创建目录、上传下载文件的权限。FTP日志分别输出到屏幕和以学号命名的.Log文件中,输出至屏幕的日志等级为INFO,输出文件的日志等级为WARNING。该目录下,仅当有文件或文件被修改时,记录一条日志事件,要求包括事件发生时间、事件发生位置、事件的严重程度、事件内容,并将该事件输出至以学号命名的.Log文件中。

(2)编写程序实现邮件发送,将.log文件的内容作为邮件正文,.log文件为附件。

提交要求:粘贴两个程序的代码,上传运行过程的截图,截图形式如下所示。

基础的环境

1. 依赖

  1. WIN+X 选终端管理员,win10 下叫powershell(管理员)

安装包:

pip install -i https://mirrors.ustc.edu.cn/pypi/web/simple chardet colorama colorlog watchdog ansi2html pyftpdlib

如果有奇怪的报错,请不要用python 3.12

2. 需要一个SMTP服务器(可以自己去整一个)

这里具体可以去看章节:QQ邮箱SMTP地址

3. 配置

这个能跑,但是你想学到东东还是要自己写比较好。

小TIPS

  1. CTRL+H 直接把dayi替换掉。
  2. 发件人和收件人和登录名可能是一个,也可以不一样。(qq就可以全部一样)
  3. 授权码记得改。

再来个邮箱方便参考

  1. 授权码问题:

这里,授权码没改的缘故。

  1. 如果显示被占用什么的,去FTPserver文件换个端口。(在127.0.0.1附近,34行)之后登录记得改端口

QQ邮箱SMTP地址:

其他的邮箱服务商同理

1. 打开QQ邮箱:

https://mail.qq.com/

2. 打开设置,然后到这里的SMTP,点击开启服务

老板QQ:

3. 然后是密保信息

4. 复制这个就可以啦

密保信息填完之后,复制授权码

截图的要求:

登录FTP后:

1. filezilla 下载地址:https://pc.qq.com/detail/6/detail_22246.html

点击立即下载后,普通下载,证书忽略过期(速度快)

2. 使用filezilla登录FTP服务器:

用户名,密码在主程序的传参里

3. 登录

4. 尝试传送点文件ovo,增删改查。

日志正常输出(等级为WARNING)

5. 过了一会自动断开连接

进行文件的上传、下载和创建操作后:

感觉跟差不多,多弄点图

FTP程序显示的运行结果:

以学号命名的.Log文件内容:

邮件发送:

邮件图:

【文件!】文件下载:

0.依赖

pip install -i https://mirrors.ustc.edu.cn/pypi/web/simple chardet colorama colorlog watchdog ansi2html pyftpdlib

2. 仅修改main.py的配置即可。(修改授权码,学号,登录名等)

2. 下载后,运行main.py,会自动启动watch进程和FTP进程

3. ctrl+c 会尝试发送邮件

运行样子:

有个小bug,可能会重复发送两次邮件(但是完全不影响结果,晚点再改)

这下面是一些调试过程的图

邮件发送!

当当:

FTP服务器

发送邮件:

运行截图:

  1. 启动截图

  1. 登录:

  1. 增删改查

  1. 发送邮件

  1. 邮件:

内容:

代码:

ftp_server.py

import logging.handlers
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
import os

# 好看的日志输出,初始化日志模块
import prog_1

def main(username="dayi",password="abc123...",home_dir="./dayi",log_file="202123030408.log"):
  # 初始化日志模块
  os.makedirs(home_dir, exist_ok=True) # 创建日志目录
  logger = prog_1.init_logger(output_filename=log_file)
  logger.info("Ftp 服务器启动~~~~~~~~~~~~~~~~")

  handler = FTPHandler
  authorizer = DummyAuthorizer()
  authorizer.add_user(username, password, home_dir, perm="elradfmwM")  # 全部权限
  handler.authorizer = authorizer
  handler.banner = "Welcome to my FTP server! ovo! hello~~~~~~~~~my friend!"
  handler.masquerade_address = "127.0.0.1"  # 伪装地址
  handler.max_login_attempts = 3  # 最大登录尝试次数
  handler.permit_foreign_addresses = True  # 允许远程访问
  
  # 下载上传速度设置
  dtp_handler = handler.dtp_handler
  dtp_handler.read_limit = 1024 * 1024  
  dtp_handler.write_limit = 1024 * 1024  
  handler.dtp_handler = dtp_handler

  handler.tcp_no_delay = True  # 禁用Nagle算法
  handler.passive_ports = range(60000, 65535)  # 被动模式端口范围
  # 启动FTP服务器
  server = FTPServer(("127.0.0.1", 21), handler)
  logging.info("FTP server started on 127.0.0.1:21")
  server.serve_forever()


if __name__ == "__main__":
  main()

2. prog_1.py

from watchdog.observers import Observer  # 导入watchdog库中的Observer类,用于监控文件系统事件
from watchdog.events import FileSystemEventHandler  # 导入FileSystemEventHandler类,用于处理文件系统事件
import time
import logging
import colorlog
import os
from colorlog.escape_codes import escape_codes
import chardet

logger = logging.getLogger()  # 获取logger实例

def log_init(output_filename="114514.log"):
    global logger
    logger.setLevel(logging.INFO)  # 设置日志级别
    handler = colorlog.StreamHandler()  # 创建一个StreamHandler用于输出到控制台
    # -[func:%(funcName)s]
    handler.setFormatter(colorlog.ColoredFormatter(
        '[%(log_color)s%(asctime)s.%(msecs)03d%(reset)s][%(log_color)s%(levelname)s%(reset)s]: %(log_color)s%(message)s',  # 设置日志输出格式
        datefmt='%Y-%m-%d %H:%M:%S',  # 设置时间格式
        reset=True,  # 重置终端属性
        log_colors={  # 设置日志级别和日期时间的颜色
            'DEBUG': 'cyan',
            'INFO': 'green',
            'WARNING': 'yellow',
            'ERROR': 'red',
            'CRITICAL': 'red,bg_white',
            'asctime': 'blue',  # 设置日期时间的颜色
        },
    ))
    logger.addHandler(handler)

    # 输出到文件
    log_dir = '.'
    # os.makedirs(log_dir, exist_ok=True) # 创建日志目录
    file_handler = logging.FileHandler(os.path.join(log_dir, output_filename), 'a', encoding='utf-8')
    file_formatter = logging.Formatter('[%(asctime)s.%(msecs)03d][%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    file_handler.setFormatter(file_formatter)
    file_handler.setLevel(logging.WARNING) # 只输出WARNING及以上级别的日志到文件
    logger.addHandler(file_handler)


def colorize_string(s, color):
    return f"{escape_codes[color]}{s}{escape_codes['reset']}"




# 用chardet来检测编码
def detect_encoding(file_path):
  # 先尝试GBK读取
  try:
    with open(file_path, 'r', encoding='gbk') as f:
       f.read()
    return 'gbk'
  except:
    with open(file_path, 'rb') as f:
      result = chardet.detect(f.read())
    return result['encoding']


class MyHandler(FileSystemEventHandler):
    def __init__(self):
      self.emit_once = True # 减少事件触发次数
    def on_any_event(self, event):
        # 如果包含__pycache__,则忽略
        if "__pycache__" in event.src_path:
          return

        # 事件类型描述
        event_description = {
          'modified': colorize_string("[修改]", 'red'),
          'created': colorize_string("[创建]", 'green'),
          'deleted': colorize_string("[删除]", 'yellow'),
          'moved': colorize_string("[移动]", 'blue')
        }
        # 目录或文件描述
        file_dir_desc = {
         True: colorize_string("【目录】", 'blue'), 
         False: colorize_string("【文件】", 'yellow')  
        }

        # 是否是目录
        dir_or_file = file_dir_desc[event.is_directory]
        logger_str = f"{event_description.get(event.event_type, '未知事件'+str(event.event_type))}{dir_or_file}: 路径:{event.src_path}"
        # 记录事件信息
        logger.warning(logger_str)

        # 对于文件的修改和创建事件,进一步处理
        if event.event_type in ['modified', 'created'] and not event.is_directory:
          file_name_without_path = event.src_path.split("\\")[-1]  # 从路径中提取文件名
          action = "被修改" if event.event_type == 'modified' else "被创建"
          # logger.info(f'文件: {file_name_without_path} {action}')
          
          # 修改事件
          if event.event_type == 'modified':
            return
            # bin
            try:
               with open(event.src_path, 'rb') as file:
                 logger.info('读取文件内容成功(二进制)')
                 file_contents = file.read()
                 print(f"=============={file_name_without_path}文件内容如下(二进制)==============")
                 print(f"{file_name_without_path}文件内容(二进制):\n{file_contents}")  # 打印文件内容
                 print(f"=============={file_name_without_path}文件内容如上(二进制)==============")
            except Exception as e:
               logger.error(f'读取文件时发生错误(二进制读取失败?!): {e}') 
            # UTF-8 + GBK + ANY
            try:
              encoding=detect_encoding(event.src_path)
              logger.info("检测到编码为:"+encoding)
              with open(event.src_path, 'r', encoding=encoding) as file:
                file_contents = file.read()
                print(f"=============={file_name_without_path}文件修改后内容如下({encoding})==============")
                print(f"文件内容({encoding}):\n{file_contents}")  # 打印文件内容
                print(f"=============={file_name_without_path}文件修改后内容如上({encoding})==============")
              #下面的代码应该不会触发,历史遗留 
            except Exception as e:
                try:
                  logger.info('尝试UTF-8编码读取...')
                  with open(event.src_path, 'r',encoding="utf-8") as file:
                    logger.info('读取文件内容成功(UTF-8)')
                    file_contents = file.read()
                    print("==============新文件内容如下==============")
                    print(f"文件内容(UTF-8):\n{file_contents}")  # 打印文件内容
                    print("==============新文件内容如上==============")
                except Exception as e:
                  logger.error(f'读取文件时发生错误: {e}')  # 使用日志记录异常信息
                  logger.info('尝试GBK编码读取...')
                  try:
                      with open(event.src_path, 'r',encoding="gbk") as file:
                        logger.info('读取文件内容成功(GBK)')
                        file_contents = file.read()
                        print("==============新文件内容如下==============")
                        print(f"文件内容:\n{file_contents}")  # 打印文件内容
                        print("==============新文件内容如上==============")
                  except Exception as e:
                        logger.error(f'读取文件再次发生错误,未知报错: {e}')
        
        # # 邮件发送!
        # logger.info('正在尝试发送邮件')
        # time.sleep(1)  # 减少CPU占用
        # try:
        #   import sendmail3
        #   sendmail3.main()
        # except Exception as e:
        #   logger.error(f'发送邮件时发生错误: {e}')

def monitor_folder(path='.'):
  event_handler = MyHandler()  # 创建MyHandler实例
  observer = Observer()  # 创建Observer实例
  observer.schedule(event_handler, path, recursive=False)  # 配置Observer监控的目录和事件处理器
  observer.start()  # 启动Observer
  try:  # 等待捕获键盘中断异常
    while True:  
      time.sleep(1)  # 减少CPU占用
  except KeyboardInterrupt:  # 捕获键盘中断(Ctrl+C)
    observer.stop()  # 停止Observer
    # print("[-]捕获到键盘中断,停止监控") 
    logger.warning("[-]捕获到键盘中断,停止监控")
  observer.join()  # 等待Observer线程结束



# 初始化日志模块
  
# 监控
def watch_file(path=".",output_filename="202123030408.log"):
  log_init(output_filename=output_filename) #初始化日志
  logger.info("看门狗!启动~~~~~~~~~~~~~~~~")
  # 这里实际上是两个模块,子进程
  now_path = path  # 获取当前脚本的路径
  logger.info("[+]当前目录为:"+now_path)
  logger.info("[+]开始监控文件夹:"+ now_path)
  monitor_folder(now_path)  # 调用monitor_folder函数开始监控当前脚本所在的目录

def init_logger(output_filename="202123030408.log"):
  log_init(output_filename=output_filename) #初始化日志
  logger.info("logger被初始化咧,当前输出文件为:"+output_filename)
  return logger


def getlogger():
   global logger
   return logger


# 


if __name__=="__main__":  # 如果此脚本作为主程序运行
  log_init() #初始化日志
  logger.info("logger测试")
  now_path = os.getcwd()  # 获取当前脚本的路径
  logger.info("[+]当前目录为:"+now_path)
  logger.info("[+]开始监控文件夹:"+ now_path)
  monitor_folder(now_path)  # 调用monitor_folder函数开始监控当前脚本所在的目录
  
  

send_mail_ovo.py

彩色日志!

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
import os

from ansi2html import Ansi2HTMLConverter
import re

import prog_1




def send_email(sender, receivers, subject, body, attachment_path, log_path, image_path, smtp_server, smtp_server_port, mail_user, mail_pass,logger):

    logger.info("发送邮件配置如下:")
    logger.info("SMTP服务器:" + smtp_server)
    logger.info("SMTP服务器端口:" + str(smtp_server_port))
    logger.info("发件人:" + sender)
    logger.info("收件人:" + str(receivers))
    logger.info("主题:" + subject)
    logger.info("附件路径:" + attachment_path)
    logger.info("日志路径:" + log_path)


    msg = MIMEMultipart('related')  # 创建邮件对象,'related' 用于发送内嵌资源的邮件
    msg['From'] = sender
    msg['To'] = ";".join(receivers)
    msg['Subject'] = subject

    # HTML 邮件正文
    with open(log_path, "r", encoding="utf-8") as f:
        content = f.read()

    

    # 彩色
    conv = Ansi2HTMLConverter()
    with open(log_path, 'r',encoding='utf-8') as f:
      content = f.read()

    content = re.sub(r'\n', '\n', content)
    html_content = conv.convert(content, full=False)


    # 彩色!
    def colorize(ansi_code, text):
      color_map = {
          '31': 'red',
          '32': 'green',
          '33': 'pink',
          '34': 'blue'
      }
      color = color_map.get(ansi_code, 'black')
      return f'<span style="color:{color}">{text}</span>'
    
    def colorize_info(color, text):
        return f'<span style="color:{color}">{text}</span>'
    level_pattern = r'\[(WARNING|ERROR)\]'
    html_content = re.sub(level_pattern, lambda m: colorize_info('orange' if m.group(1) == 'WARNING' else 'red', m.group(0)), html_content)
    timestamp_pattern = r'\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+\]'
    html_content = re.sub(timestamp_pattern, lambda m: colorize_info('green', m.group(0)), html_content)

    pattern = r'<span class="ansi(\d+)">([^<]+)</span>'
    html_content = re.sub(pattern, lambda m: colorize(m.group(1), m.group(2)), html_content)
    
    with open("test.html",'w',encoding='utf-8') as f:
      f.write(f'<html><body>{html_content}</body></html>')


    html_content = re.sub(r'\n', '<br>', html_content)

    html_body = MIMEText(f'<html><body>{html_content}</body></html>', 'html', 'utf-8')
    # html_body = MIMEText(f'ovo', 'html', 'utf-8')
    msg.attach(html_body)

    # 添加图片
    # with open(image_path, 'rb') as f:
    #     mime_image = MIMEImage(f.read(), 'webp')
    # mime_image.add_header('Content-ID', '<image1>')
    # msg.attach(mime_image)

    # 添加附件
    with open(attachment_path, 'rb') as f:
        att1 = MIMEText(f.read(), 'base64', 'utf-8')
    att1["Content-Type"] = 'application/octet-stream'
    att1["Content-Disposition"] = f'attachment; filename="{os.path.basename(attachment_path)}"'
    msg.attach(att1)

    # 添加日志文件作为附件
    with open(log_path, 'rb') as f:
        att2 = MIMEText(f.read(), 'base64', 'utf-8')
    att2["Content-Type"] = 'application/octet-stream'
    att2["Content-Disposition"] = f'attachment; filename="{os.path.basename(log_path)}"'
    msg.attach(att2)

    # 发送邮件
    try:
        smtpObj = smtplib.SMTP(smtp_server, smtp_server_port)
        smtpObj.ehlo()
        smtpObj.starttls()  # 如果SMTP服务器要求安全连接,则需要
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receivers, msg.as_string())
        logger.info("邮件发送成功")
    except smtplib.SMTPException as e:
        logger.error(f"发送邮件出线错误!, 错误信息: {e}")
    finally:
        smtpObj.quit()


def main(logpath='202123030408.log',smtp_server='smtp.qq.com',smtp_server_port=587,mail_user='',mail_pass='',sender='',recipients='[]',subject='日志!'):
    
    logger=prog_1.init_logger(output_filename=logpath) #初始化日志
    logger.info("我是发邮件滴!!正在尝试发送邮件~~~~~~~~~~~~~~~")
    # sender = '851845341@qq.com' #发件人
    # receivers = ['lamu10@163.com'] #收件人
    receivers = recipients
    # subject = '日志!'
    body = '你好呀呀呀' # 其实没什么用
    attachment_path = 'test.html' #这个是html的渲染格式
    log_path = logpath #日志路径

    image_path = '====.pic' #已经废弃!
    # smtp_server = 'smtp.qq.com'
    # smtp_server_port = 587
    # mail_user = "851845341@qq.com"
    # mail_pass = "授权码"
    send_email(sender, receivers, subject, body, attachment_path, log_path, image_path, smtp_server, smtp_server_port, mail_user, mail_pass,logger)

if __name__ == '__main__':
    # 配置
    file_path = '202123030408.log' # 日志文件
    smtp_server = 'smtp.qq.com' # smtp 服务器
    smtp_server_port = 587 # smtp 服务器端口
    mail_user = '81@qq.com' # SMTP登录名
    sender = '8@qq.com' # 发件人
    mail_pass = 'lof' #SMTP密码
    recipients = ['l0@163.com'] #收件人
    subject = '日志!'
    # 发送邮件
    main(logpath=file_path, smtp_server=smtp_server,smtp_server_port=smtp_server_port,mail_user=mail_user,sender=sender,mail_pass=mail_pass,recipients=recipients,subject=subject)

main.py

import time
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os
from multiprocessing import Process

from prog_1 import watch_file
import prog_1
import ftp_server
from send_mail_ovo import main as send_mail_

if __name__ == '__main__':
  
  # 日志文件名
  logger_file_name = '202123030408.log'


  # 初始化日志模块
  logger=prog_1.init_logger(output_filename=logger_file_name)
  logger.info("主进程启动~~~~~~~~~~~~~~~~")


  # 看门狗!日志文件:logger_file_name
  p = Process(target=watch_file, args=("./dayi",logger_file_name))
  p.start()

  # FTP服务器, 用户名dayi,密码abc123...,主目录./dayi,日志文件:logger_file_name
  p2 = Process(target=ftp_server.main, args=("dayi","abc123...","./dayi","202123030408.log"))
  p2.start()


  # 主进程保持运行,直到手动中断
  
  # 发送邮件!(CTRL+C) 会自动发送哦,请记得修改配置文件
  file_path = '202123030408.log' # 日志文件
  smtp_server = 'smtp.exmail.qq.com' # smtp 服务器
  smtp_server_port = 587 # smtp 服务器端口
  mail_user = '202123030408@sdust.edu.cn' # SMTP登录名
  sender = 'dayi@sdust.edu.cn' # 发件人
  mail_pass = 'DvWsv' #SMTP密码(授权码)
  recipients = ['lamu10@163.com'] #收件人
  subject = '日志!'


  try:
      while True:
        time.sleep(0.5)
  except KeyboardInterrupt:
      print("[-]捕获到键盘中断,停止主进程")
      
      p2.kill()
      p.kill()

  logger.info('正在尝试发送邮件')
  time.sleep(1)  # 减少CPU占用
  
  mail_sent = False
  if not mail_sent:
    try:
        send_mail_(logpath=file_path, smtp_server=smtp_server,smtp_server_port=smtp_server_port,mail_user=mail_user,sender=sender,mail_pass=mail_pass,recipients=recipients,subject=subject)
        mail_sent = True
    except Exception as e:
        logger.error(f'发送邮件时发生错误: {e}')

  print("主进程已中断")

文件下载:

  1. 香港:https://pic.icee.top/blog/pic_bed/sharex/_pn-2024-03-20-18-23-02_Mollies_Knowledgeable_Submissive.rar
  2. CF:https://p.dabbit.net/blog/pic_bed/sharex/_pn-2024-03-20-18-23-02_Mollies_Knowledgeable_Submissive.rar
最后修改:2024 年 03 月 21 日
如果觉得我的文章对你有用,请随意赞赏