significant improvement in class formation

This commit is contained in:
anoduck 2023-10-20 18:39:31 -04:00
parent 6afad9fad4
commit d4c31ce242

View file

@ -235,7 +235,6 @@ def strainer(pkt) -> None:
# ----------------------------------------------------------------
class NetDev:
def __init__(self) -> None:
self.log = Log.get_log
self.macaddr = fake.wifi_essid()
pge = Purge()
self.name = pge.get_name()
@ -251,39 +250,39 @@ class NetDev:
try:
os.system(f'ip link set {self.name} up')
os.system(f'iw dev {self.name} interface add {self.mon_crtd} type monitor')
self.log.debug('Created ', self.mon_crtd)
log.debug('Created ', self.mon_crtd)
os.system(f'ip link set {self.mon_crtd} down')
os.system(f'ip link set {self.mon_crtd} address {self.macaddr}')
self.log.debug('Set device address to ', self.macaddr)
log.debug('Set device address to ', self.macaddr)
os.system(f'ip link set {self.mon_crtd} up')
self.log.debug('Set device up')
log.debug('Set device up')
os.system('iw set reg US')
self.log.debug('Set device registry to US')
self.log.info('Device is fully configured and up')
log.debug('Set device registry to US')
log.info('Device is fully configured and up')
return True
except os.error as e:
self.log.debug('Failed to create ', self.mon_crtd, e)
log.debug('Failed to create ', self.mon_crtd, e)
print('Creation of new monitor self.name failed:', e)
sys.exit(1)
def switch_if(self) -> bool:
try:
os.system(f'ip link set {self.name} down')
self.log.debug('Set device down')
log.debug('Set device down')
os.system(f'ip link set {self.name} address {self.macaddr}')
self.log.debug('Set device address to ', self.macaddr)
log.debug('Set device address to ', self.macaddr)
os.system(f'iw dev {self.name} set type monitor')
self.log.debug(self.name, ' switched to monitor')
log.debug(self.name, ' switched to monitor')
os.system(f'ip link set {self.name} up')
# (below) setting registry is known to cause issues.
os.system('iw set reg US')
self.log.debug('Set device registry to US')
log.debug('Set device registry to US')
scapyconfig.iface = self.name
self.log.info('Set scapy config self.name to: ', self.name)
self.log.info('Device is fully configured and up')
log.info('Set scapy config self.name to: ', self.name)
log.info('Device is fully configured and up')
return True
except os.error as e:
self.log.debug('Failed to switch ', self.name, ' type', e)
log.debug('Failed to switch ', self.name, ' type', e)
print('Failed to change ', self.name, ' mode', e)
sys.exit(1)
@ -299,9 +298,9 @@ class NetDev:
Returns:
str: The name of the created or switched monitor interface.
"""
self.log.debug('mac_address type: ', str(type(self.macaddr)))
self.log.debug('mac_address: ', str(self.macaddr))
self.log.info('Starting monitor interface')
log.debug('mac_address type: ', str(type(self.macaddr)))
log.debug('mac_address: ', str(self.macaddr))
log.info('Starting monitor interface')
if self.mon_type == 'create':
self.create_if()
mon_if = self.mon_crtd
@ -312,22 +311,22 @@ class NetDev:
return mon_if
else:
Exception('Invalid monitor type')
self.log.debug('Invalid monitor type')
log.debug('Invalid monitor type')
sys.exit(1)
def channel_runner(self, mon_if) -> None:
self.log.info('Channel Runner NG started.')
self.log.info('Preliminary channel list: ', str(self.channels))
log.info('Channel Runner NG started.')
log.info('Preliminary channel list: ', str(self.channels))
chanlist = self.channels.split(',')
chlist = list(set(chanlist))
self.log.info('Channel list: ', str(chlist))
log.info('Channel list: ', str(chlist))
chans = [int(chan) for chan in chlist]
thread = threading.current_thread()
print(f'name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(chans)
os.system(f'iw dev {mon_if} set channel {str(ichan)}')
# self.log.debug('Channel set to ', str(ichan))
# log.debug('Channel set to ', str(ichan))
sleep(14.7)
@ -356,7 +355,7 @@ class Purge:
self.mon_type = kwargs.get('mon_type')
self.valid_file = kwargs.get('valid_file')
self.channels = kwargs.get('channels')
self.log = Log.get_log
log = get_log
self.scan_df = DataFrame.get_df
def start_purge(self) -> None:
@ -382,45 +381,24 @@ class Purge:
print('Enter Ctrl+C TWICE to fully stop the script.')
dev = NetDev()
mon_if = dev.start_monitor()
self.log.info('interface ', mon_if, ' is up and running.')
log.info('interface ', mon_if, ' is up and running.')
# vfile = self.get_file()
# self.log.info('We will be writing captured macs to ', str(self.valid_file))
# log.info('We will be writing captured macs to ', str(self.valid_file))
chop = asyncio.to_thread(dev.channel_runner(mon_if)) # type: ignore
chopper = asyncio.create_task(chop)
self.log.info('Channel runner started.')
log.info('Channel runner started.')
await asyncio.sleep(0)
while True:
self.log.info('starting sniffer')
log.info('starting sniffer')
asniff = AsyncSniffer(iface=mon_if,
prn=strainer,
store=False, monitor=True)
asniff.start()
self.log.info('asniffer started')
log.info('asniffer started')
forever_wait = threading.Event()
forever_wait.wait()
class Log:
def __init__(self) -> None:
self.log = logging.getLogger(__name__)
self.file = os.path.abspath('/var/log/ctiger.log')
def get_log(self):
return self.log
def set_log(self):
logging.basicConfig(filename=self.file, level=args.log_level,
format='%(asctime)s %(levelname)s %(message)s')
# create logger
logging.getLogger("scapy.runtime").setLevel(logging.DEBUG)
handler = logging.FileHandler(self.file, mode='a', encoding='utf-8')
logging.getLogger("scapy.runtime").addHandler(handler)
self.log.setLevel(logging.DEBUG)
self.log.info('Started crouching tiger')
self.log.info('Started logger...')
return self.log
# -----------------------------------------------------------------------------
# ███████╗███╗ ██╗██╗███████╗███████╗ ███████╗████████╗ ██████╗ ██████╗
# ██╔════╝████╗ ██║██║██╔════╝██╔════╝ ██╔════╝╚══██╔══╝██╔═══██╗██╔══██╗
@ -649,6 +627,22 @@ class DataFrame:
return self.scan_df
def get_log():
log_file = os.path.abspath('/var/log/ctiger.log')
logging.basicConfig(filename=log_file, level=args.log_level,
format='%(asctime)s %(levelname)s %(message)s')
# create logger
logging.getLogger("scapy.runtime").setLevel(logging.DEBUG)
handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
logging.getLogger("scapy.runtime").addHandler(handler)
global log
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
log.info('Started crouching tiger')
log.info('Started logger...')
return log
# -------------------------------------------------------------------
# ██████╗ ███████╗ █████╗ ██████╗ ██████╗ ███████╗
# ██╔══██╗██╔════╝ ██╔══██╗██╔══██╗██╔════╝ ██╔════╝
@ -669,12 +663,7 @@ def process_args(args: argparse.Namespace) -> None:
None
"""
logging.basicConfig(filename=args.log_file, level=args.log_level,
format='%(asctime)s %(levelname)s %(message)s')
# create logger
global log
log = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
log = get_log()
log.info('Started crouching tiger')
log.info('Started logger...')
match args.module: