Python 运维作业2——作业2(FTP+SMTP+日志)
更新地址:
要求
在当前路径下创建文件夹,并用自己的姓名命名,该文件夹下有若干中文命名的文件和文件夹。
编程实现如下功能:
(1)编写FTP程序,设置用户名、密码、上传下载速率限制,家目录为姓名命名的目录。用户具有更改目录、删除文件或目录、创建目录、上传下载文件的权限。FTP日志分别输出到屏幕和以学号命名的.Log文件中,输出至屏幕的日志等级为INFO,输出文件的日志等级为WARNING。该目录下,仅当有文件或文件被修改时,记录一条日志事件,要求包括事件发生时间、事件发生位置、事件的严重程度、事件内容,并将该事件输出至以学号命名的.Log文件中。
(2)编写程序实现邮件发送,将.log文件的内容作为邮件正文,.log文件为附件。
提交要求:粘贴两个程序的代码,上传运行过程的截图,截图形式如下所示。
基础的环境
1. 依赖
- WIN+X 选终端管理员,win10 下叫powershell(管理员)
安装包:
pip install -i https://mirrors.ustc.edu.cn/pypi/web/simple chardet colorama colorlog watchdog ansi2html pyftpdlib
如果有奇怪的报错,请不要用python 3.12
2. 需要一个SMTP服务器(可以自己去整一个)
这里具体可以去看章节:QQ邮箱SMTP地址
3. 配置
这个能跑,但是你想学到东东还是要自己写比较好。
小TIPS
- CTRL+H 直接把dayi替换掉。
- 发件人和收件人和登录名可能是一个,也可以不一样。(qq就可以全部一样)
- 授权码记得改。
再来个邮箱方便参考
- 授权码问题:
这里,授权码没改的缘故。
- 如果显示被占用什么的,去FTPserver文件换个端口。(在127.0.0.1附近,34行)之后登录记得改端口
QQ邮箱SMTP地址:
其他的邮箱服务商同理
1. 打开QQ邮箱:
2. 打开设置,然后到这里的SMTP,点击开启服务
老板QQ:
3. 然后是密保信息
4. 复制这个就可以啦
密保信息填完之后,复制授权码
截图的要求:
登录FTP后:
1. filezilla 下载地址:https://pc.qq.com/detail/6/detail_22246.html
点击立即下载后,普通下载,证书忽略过期(速度快)
2. 使用filezilla登录FTP服务器:
用户名,密码在主程序的传参里
3. 登录
4. 尝试传送点文件ovo,增删改查。
日志正常输出(等级为WARNING)
5. 过了一会自动断开连接
进行文件的上传、下载和创建操作后:
感觉跟差不多,多弄点图
FTP程序显示的运行结果:
以学号命名的.Log文件内容:
邮件发送:
邮件图:
【文件!】文件下载:
0.依赖
pip install -i https://mirrors.ustc.edu.cn/pypi/web/simple chardet colorama colorlog watchdog ansi2html pyftpdlib
2. 仅修改main.py的配置即可。(修改授权码,学号,登录名等)
2. 下载后,运行main.py,会自动启动watch进程和FTP进程
3. ctrl+c 会尝试发送邮件
运行样子:
有个小bug,可能会重复发送两次邮件(但是完全不影响结果,晚点再改)
这下面是一些调试过程的图
邮件发送!
当当:
FTP服务器
发送邮件:
运行截图:
- 启动截图
- 登录:
- 增删改查
- 发送邮件
- 邮件:
内容:
代码:
ftp_server.py
import logging.handlers
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
import os
# 好看的日志输出,初始化日志模块
import prog_1
def main(username="dayi",password="abc123...",home_dir="./dayi",log_file="202123030408.log"):
# 初始化日志模块
os.makedirs(home_dir, exist_ok=True) # 创建日志目录
logger = prog_1.init_logger(output_filename=log_file)
logger.info("Ftp 服务器启动~~~~~~~~~~~~~~~~")
handler = FTPHandler
authorizer = DummyAuthorizer()
authorizer.add_user(username, password, home_dir, perm="elradfmwM") # 全部权限
handler.authorizer = authorizer
handler.banner = "Welcome to my FTP server! ovo! hello~~~~~~~~~my friend!"
handler.masquerade_address = "127.0.0.1" # 伪装地址
handler.max_login_attempts = 3 # 最大登录尝试次数
handler.permit_foreign_addresses = True # 允许远程访问
# 下载上传速度设置
dtp_handler = handler.dtp_handler
dtp_handler.read_limit = 1024 * 1024
dtp_handler.write_limit = 1024 * 1024
handler.dtp_handler = dtp_handler
handler.tcp_no_delay = True # 禁用Nagle算法
handler.passive_ports = range(60000, 65535) # 被动模式端口范围
# 启动FTP服务器
server = FTPServer(("127.0.0.1", 21), handler)
logging.info("FTP server started on 127.0.0.1:21")
server.serve_forever()
if __name__ == "__main__":
main()
2. prog_1.py
from watchdog.observers import Observer # 导入watchdog库中的Observer类,用于监控文件系统事件
from watchdog.events import FileSystemEventHandler # 导入FileSystemEventHandler类,用于处理文件系统事件
import time
import logging
import colorlog
import os
from colorlog.escape_codes import escape_codes
import chardet
logger = logging.getLogger() # 获取logger实例
def log_init(output_filename="114514.log"):
global logger
logger.setLevel(logging.INFO) # 设置日志级别
handler = colorlog.StreamHandler() # 创建一个StreamHandler用于输出到控制台
# -[func:%(funcName)s]
handler.setFormatter(colorlog.ColoredFormatter(
'[%(log_color)s%(asctime)s.%(msecs)03d%(reset)s][%(log_color)s%(levelname)s%(reset)s]: %(log_color)s%(message)s', # 设置日志输出格式
datefmt='%Y-%m-%d %H:%M:%S', # 设置时间格式
reset=True, # 重置终端属性
log_colors={ # 设置日志级别和日期时间的颜色
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
'asctime': 'blue', # 设置日期时间的颜色
},
))
logger.addHandler(handler)
# 输出到文件
log_dir = '.'
# os.makedirs(log_dir, exist_ok=True) # 创建日志目录
file_handler = logging.FileHandler(os.path.join(log_dir, output_filename), 'a', encoding='utf-8')
file_formatter = logging.Formatter('[%(asctime)s.%(msecs)03d][%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.WARNING) # 只输出WARNING及以上级别的日志到文件
logger.addHandler(file_handler)
def colorize_string(s, color):
return f"{escape_codes[color]}{s}{escape_codes['reset']}"
# 用chardet来检测编码
def detect_encoding(file_path):
# 先尝试GBK读取
try:
with open(file_path, 'r', encoding='gbk') as f:
f.read()
return 'gbk'
except:
with open(file_path, 'rb') as f:
result = chardet.detect(f.read())
return result['encoding']
class MyHandler(FileSystemEventHandler):
def __init__(self):
self.emit_once = True # 减少事件触发次数
def on_any_event(self, event):
# 如果包含__pycache__,则忽略
if "__pycache__" in event.src_path:
return
# 事件类型描述
event_description = {
'modified': colorize_string("[修改]", 'red'),
'created': colorize_string("[创建]", 'green'),
'deleted': colorize_string("[删除]", 'yellow'),
'moved': colorize_string("[移动]", 'blue')
}
# 目录或文件描述
file_dir_desc = {
True: colorize_string("【目录】", 'blue'),
False: colorize_string("【文件】", 'yellow')
}
# 是否是目录
dir_or_file = file_dir_desc[event.is_directory]
logger_str = f"{event_description.get(event.event_type, '未知事件'+str(event.event_type))}{dir_or_file}: 路径:{event.src_path}"
# 记录事件信息
logger.warning(logger_str)
# 对于文件的修改和创建事件,进一步处理
if event.event_type in ['modified', 'created'] and not event.is_directory:
file_name_without_path = event.src_path.split("\\")[-1] # 从路径中提取文件名
action = "被修改" if event.event_type == 'modified' else "被创建"
# logger.info(f'文件: {file_name_without_path} {action}')
# 修改事件
if event.event_type == 'modified':
return
# bin
try:
with open(event.src_path, 'rb') as file:
logger.info('读取文件内容成功(二进制)')
file_contents = file.read()
print(f"=============={file_name_without_path}文件内容如下(二进制)==============")
print(f"{file_name_without_path}文件内容(二进制):\n{file_contents}") # 打印文件内容
print(f"=============={file_name_without_path}文件内容如上(二进制)==============")
except Exception as e:
logger.error(f'读取文件时发生错误(二进制读取失败?!): {e}')
# UTF-8 + GBK + ANY
try:
encoding=detect_encoding(event.src_path)
logger.info("检测到编码为:"+encoding)
with open(event.src_path, 'r', encoding=encoding) as file:
file_contents = file.read()
print(f"=============={file_name_without_path}文件修改后内容如下({encoding})==============")
print(f"文件内容({encoding}):\n{file_contents}") # 打印文件内容
print(f"=============={file_name_without_path}文件修改后内容如上({encoding})==============")
#下面的代码应该不会触发,历史遗留
except Exception as e:
try:
logger.info('尝试UTF-8编码读取...')
with open(event.src_path, 'r',encoding="utf-8") as file:
logger.info('读取文件内容成功(UTF-8)')
file_contents = file.read()
print("==============新文件内容如下==============")
print(f"文件内容(UTF-8):\n{file_contents}") # 打印文件内容
print("==============新文件内容如上==============")
except Exception as e:
logger.error(f'读取文件时发生错误: {e}') # 使用日志记录异常信息
logger.info('尝试GBK编码读取...')
try:
with open(event.src_path, 'r',encoding="gbk") as file:
logger.info('读取文件内容成功(GBK)')
file_contents = file.read()
print("==============新文件内容如下==============")
print(f"文件内容:\n{file_contents}") # 打印文件内容
print("==============新文件内容如上==============")
except Exception as e:
logger.error(f'读取文件再次发生错误,未知报错: {e}')
# # 邮件发送!
# logger.info('正在尝试发送邮件')
# time.sleep(1) # 减少CPU占用
# try:
# import sendmail3
# sendmail3.main()
# except Exception as e:
# logger.error(f'发送邮件时发生错误: {e}')
def monitor_folder(path='.'):
event_handler = MyHandler() # 创建MyHandler实例
observer = Observer() # 创建Observer实例
observer.schedule(event_handler, path, recursive=False) # 配置Observer监控的目录和事件处理器
observer.start() # 启动Observer
try: # 等待捕获键盘中断异常
while True:
time.sleep(1) # 减少CPU占用
except KeyboardInterrupt: # 捕获键盘中断(Ctrl+C)
observer.stop() # 停止Observer
# print("[-]捕获到键盘中断,停止监控")
logger.warning("[-]捕获到键盘中断,停止监控")
observer.join() # 等待Observer线程结束
# 初始化日志模块
# 监控
def watch_file(path=".",output_filename="202123030408.log"):
log_init(output_filename=output_filename) #初始化日志
logger.info("看门狗!启动~~~~~~~~~~~~~~~~")
# 这里实际上是两个模块,子进程
now_path = path # 获取当前脚本的路径
logger.info("[+]当前目录为:"+now_path)
logger.info("[+]开始监控文件夹:"+ now_path)
monitor_folder(now_path) # 调用monitor_folder函数开始监控当前脚本所在的目录
def init_logger(output_filename="202123030408.log"):
log_init(output_filename=output_filename) #初始化日志
logger.info("logger被初始化咧,当前输出文件为:"+output_filename)
return logger
def getlogger():
global logger
return logger
#
if __name__=="__main__": # 如果此脚本作为主程序运行
log_init() #初始化日志
logger.info("logger测试")
now_path = os.getcwd() # 获取当前脚本的路径
logger.info("[+]当前目录为:"+now_path)
logger.info("[+]开始监控文件夹:"+ now_path)
monitor_folder(now_path) # 调用monitor_folder函数开始监控当前脚本所在的目录
send_mail_ovo.py
彩色日志!
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
import os
from ansi2html import Ansi2HTMLConverter
import re
import prog_1
def send_email(sender, receivers, subject, body, attachment_path, log_path, image_path, smtp_server, smtp_server_port, mail_user, mail_pass,logger):
logger.info("发送邮件配置如下:")
logger.info("SMTP服务器:" + smtp_server)
logger.info("SMTP服务器端口:" + str(smtp_server_port))
logger.info("发件人:" + sender)
logger.info("收件人:" + str(receivers))
logger.info("主题:" + subject)
logger.info("附件路径:" + attachment_path)
logger.info("日志路径:" + log_path)
msg = MIMEMultipart('related') # 创建邮件对象,'related' 用于发送内嵌资源的邮件
msg['From'] = sender
msg['To'] = ";".join(receivers)
msg['Subject'] = subject
# HTML 邮件正文
with open(log_path, "r", encoding="utf-8") as f:
content = f.read()
# 彩色
conv = Ansi2HTMLConverter()
with open(log_path, 'r',encoding='utf-8') as f:
content = f.read()
content = re.sub(r'\n', '\n', content)
html_content = conv.convert(content, full=False)
# 彩色!
def colorize(ansi_code, text):
color_map = {
'31': 'red',
'32': 'green',
'33': 'pink',
'34': 'blue'
}
color = color_map.get(ansi_code, 'black')
return f'<span style="color:{color}">{text}</span>'
def colorize_info(color, text):
return f'<span style="color:{color}">{text}</span>'
level_pattern = r'\[(WARNING|ERROR)\]'
html_content = re.sub(level_pattern, lambda m: colorize_info('orange' if m.group(1) == 'WARNING' else 'red', m.group(0)), html_content)
timestamp_pattern = r'\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+\]'
html_content = re.sub(timestamp_pattern, lambda m: colorize_info('green', m.group(0)), html_content)
pattern = r'<span class="ansi(\d+)">([^<]+)</span>'
html_content = re.sub(pattern, lambda m: colorize(m.group(1), m.group(2)), html_content)
with open("test.html",'w',encoding='utf-8') as f:
f.write(f'<html><body>{html_content}</body></html>')
html_content = re.sub(r'\n', '<br>', html_content)
html_body = MIMEText(f'<html><body>{html_content}</body></html>', 'html', 'utf-8')
# html_body = MIMEText(f'ovo', 'html', 'utf-8')
msg.attach(html_body)
# 添加图片
# with open(image_path, 'rb') as f:
# mime_image = MIMEImage(f.read(), 'webp')
# mime_image.add_header('Content-ID', '<image1>')
# msg.attach(mime_image)
# 添加附件
with open(attachment_path, 'rb') as f:
att1 = MIMEText(f.read(), 'base64', 'utf-8')
att1["Content-Type"] = 'application/octet-stream'
att1["Content-Disposition"] = f'attachment; filename="{os.path.basename(attachment_path)}"'
msg.attach(att1)
# 添加日志文件作为附件
with open(log_path, 'rb') as f:
att2 = MIMEText(f.read(), 'base64', 'utf-8')
att2["Content-Type"] = 'application/octet-stream'
att2["Content-Disposition"] = f'attachment; filename="{os.path.basename(log_path)}"'
msg.attach(att2)
# 发送邮件
try:
smtpObj = smtplib.SMTP(smtp_server, smtp_server_port)
smtpObj.ehlo()
smtpObj.starttls() # 如果SMTP服务器要求安全连接,则需要
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, msg.as_string())
logger.info("邮件发送成功")
except smtplib.SMTPException as e:
logger.error(f"发送邮件出线错误!, 错误信息: {e}")
finally:
smtpObj.quit()
def main(logpath='202123030408.log',smtp_server='smtp.qq.com',smtp_server_port=587,mail_user='',mail_pass='',sender='',recipients='[]',subject='日志!'):
logger=prog_1.init_logger(output_filename=logpath) #初始化日志
logger.info("我是发邮件滴!!正在尝试发送邮件~~~~~~~~~~~~~~~")
# sender = '851845341@qq.com' #发件人
# receivers = ['lamu10@163.com'] #收件人
receivers = recipients
# subject = '日志!'
body = '你好呀呀呀' # 其实没什么用
attachment_path = 'test.html' #这个是html的渲染格式
log_path = logpath #日志路径
image_path = '====.pic' #已经废弃!
# smtp_server = 'smtp.qq.com'
# smtp_server_port = 587
# mail_user = "851845341@qq.com"
# mail_pass = "授权码"
send_email(sender, receivers, subject, body, attachment_path, log_path, image_path, smtp_server, smtp_server_port, mail_user, mail_pass,logger)
if __name__ == '__main__':
# 配置
file_path = '202123030408.log' # 日志文件
smtp_server = 'smtp.qq.com' # smtp 服务器
smtp_server_port = 587 # smtp 服务器端口
mail_user = '81@qq.com' # SMTP登录名
sender = '8@qq.com' # 发件人
mail_pass = 'lof' #SMTP密码
recipients = ['l0@163.com'] #收件人
subject = '日志!'
# 发送邮件
main(logpath=file_path, smtp_server=smtp_server,smtp_server_port=smtp_server_port,mail_user=mail_user,sender=sender,mail_pass=mail_pass,recipients=recipients,subject=subject)
main.py
import time
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os
from multiprocessing import Process
from prog_1 import watch_file
import prog_1
import ftp_server
from send_mail_ovo import main as send_mail_
if __name__ == '__main__':
# 日志文件名
logger_file_name = '202123030408.log'
# 初始化日志模块
logger=prog_1.init_logger(output_filename=logger_file_name)
logger.info("主进程启动~~~~~~~~~~~~~~~~")
# 看门狗!日志文件:logger_file_name
p = Process(target=watch_file, args=("./dayi",logger_file_name))
p.start()
# FTP服务器, 用户名dayi,密码abc123...,主目录./dayi,日志文件:logger_file_name
p2 = Process(target=ftp_server.main, args=("dayi","abc123...","./dayi","202123030408.log"))
p2.start()
# 主进程保持运行,直到手动中断
# 发送邮件!(CTRL+C) 会自动发送哦,请记得修改配置文件
file_path = '202123030408.log' # 日志文件
smtp_server = 'smtp.exmail.qq.com' # smtp 服务器
smtp_server_port = 587 # smtp 服务器端口
mail_user = '202123030408@sdust.edu.cn' # SMTP登录名
sender = 'dayi@sdust.edu.cn' # 发件人
mail_pass = 'DvWsv' #SMTP密码(授权码)
recipients = ['lamu10@163.com'] #收件人
subject = '日志!'
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
print("[-]捕获到键盘中断,停止主进程")
p2.kill()
p.kill()
logger.info('正在尝试发送邮件')
time.sleep(1) # 减少CPU占用
mail_sent = False
if not mail_sent:
try:
send_mail_(logpath=file_path, smtp_server=smtp_server,smtp_server_port=smtp_server_port,mail_user=mail_user,sender=sender,mail_pass=mail_pass,recipients=recipients,subject=subject)
mail_sent = True
except Exception as e:
logger.error(f'发送邮件时发生错误: {e}')
print("主进程已中断")