forked from Telefonica/HomePWN
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtasks.py
More file actions
90 lines (80 loc) · 2.59 KB
/
tasks.py
File metadata and controls
90 lines (80 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from prompt_toolkit import print_formatted_text, HTML
from utils.custom_print import print_info, print_error
from os import kill
import signal
from time import sleep
class Task:
__instance = None
@staticmethod
def get_instance():
if Task.__instance == None:
Task()
return Task.__instance
def __init__(self):
if Task.__instance == None:
Task.__instance = self
self.tasks = {}
self.count = 1
def add_task(self, th, pid=None, seconds_wait=0):
self.tasks[self.count] = {
"thread":th,
"pid": pid,
"wait": seconds_wait # If we need to wait x second
}
self.count += 1
def get_tasks(self):
return self.tasks
def kill_all_tasks(self):
index = []
for i in self.tasks.keys():
index.append(i)
for task in index:
self.kill_task(task)
def exist_task(self, name):
for k, v in self.tasks.items():
th = v["thread"]
if th.name == name and th.is_alive():
return True
return False
def kill_task(self, index):
try:
i = int(index)
th = self.tasks.get(i, None)
if th:
thread = th["thread"]
pid = th["pid"]
name = thread.name
try:
if pid:
kill(pid, signal.SIGINT)
# some task needs some time to stop
sleep(th["wait"])
thread.terminate()
thread.join()
except:
pass
del self.tasks[i]
print_info(f"Task {index} - {name} has been killed")
else:
print_info("Task not found")
except Exception as e:
print(e)
print_error("It has not been possible to kill the task")
def show_tasks(self):
if not len(self.tasks):
print_info("There are no running tasks at this time")
return
print_formatted_text(HTML(f'''
<ansiyellow> Index (Thread)</ansiyellow>
-------------------------'''))
flag = 0
for key, value in self.tasks.items():
pid = value.get("pid", "")
if not pid:
pid = ""
flag += 1
alive = "Alive" if value['thread'].is_alive() else "Death"
if flag > 1:
print (" |")
print(f" |_ {key} = {value['thread'].name} {pid} ({alive})")
print("")