python 实现超时退出的三种方式
linux-dash
A beautiful web dashboard for Linux
项目地址:https://gitcode.com/gh_mirrors/li/linux-dash
免费下载资源
·
基于 signal模块实现:
signal包负责在Python程序内部处理信号,典型的操作包括预设信号处理函数,暂 停并等待信号,以及定时发出SIGALRM等。要注意,signal包主要是针对UNIX平台(比如Linux, MAC OS),而Windows内核中由于对信号机制的支持不充分,所以在Windows上的Python不能发挥信号系统的功能。
# coding:utf8
import time
import signal
# 自定义超时异常
class TimeoutError(Exception):
def __init__(self, msg):
super(TimeoutError, self).__init__()
self.msg = msg
def time_out(interval, callback):
def decorator(func):
def handler(signum, frame):
raise TimeoutError("run func timeout")
def wrapper(*args, **kwargs):
try:
signal.signal(signal.SIGALRM, handler)
signal.alarm(interval) # interval秒后向进程发送SIGALRM信号
result = func(*args, **kwargs)
signal.alarm(0) # 函数在规定时间执行完后关闭alarm闹钟
return result
except TimeoutError, e:
callback(e)
return wrapper
return decorator
def timeout_callback(e):
print(e.msg)
@time_out(2, timeout_callback)
def task1():
print("task1 start")
time.sleep(3)
print("task1 end")
@time_out(2, timeout_callback)
def task2():
print("task2 start")
time.sleep(1)
print("task2 end")
if __name__ == "__main__":
task1()
task2()
输出:
task1 start
run func timeout
task2 start
task2 end
基于子线程阻塞实现超时:
# coding:utf8
import time
import threading
def callback_func():
print('超时回调')
def time_out(interval, callback=None):
def decorator(func):
def wrapper(*args, **kwargs):
t =threading.Thread(target=func, args=args, kwargs=kwargs)
t.setDaemon(True) # 设置主线程技术子线程立刻结束
t.start()
t.join(interval) # 主线程阻塞等待interval秒
if t.is_alive() and callback:
return threading.Timer(0, callback).start() # 立即执行回调函数
else:
return
return wrapper
return decorator
@time_out(2, callback_func)
def task3(hh):
print('**********task3****************')
for i in range(3):
time.sleep(1)
print(i)
print(hh)
@time_out(2, callback_func)
def task4(hh):
print('**********task4****************')
for i in range(3):
# time.sleep(1)
print(i)
print(hh)
if __name__ == '__main__':
task3('参数')
task4('参数')
输出:
**********task3****************
0
参数
1
参数
超时回调
**********task4****************
0
参数
1
参数
2
参数
基于协程实现
def callback_func():
print('callback')
def time_out(interval, callback=None):
def decorator(func):
def wrapper(*args, **kwargs):
########## 该部分必选在requests之前导入
import gevent
from gevent import monkey
monkey.patch_all()
##########
try:
gevent.with_timeout(interval, func, *args, **kwargs)
except gevent.timeout.Timeout as e:
callback() if callback else None
return wrapper
return decorator
@time_out(3, callback_func)
def func(a, b):
import time
time.sleep(2)
print(a,b)
func(1, 2)
GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e
added ecosystem file for PM2 4 年前
5def40a3
Add host customization support for the NodeJS version 4 年前
更多推荐
已为社区贡献2条内容
所有评论(0)