f2c940ae37
Many changes performed in development of Hidden Dragon hdragon command is broken for development
145 lines
5.3 KiB
Python
145 lines
5.3 KiB
Python
# Copyright (c) 2024 Anoduck
|
|
#
|
|
# This software is released under the MIT License.
|
|
# https://opensource.org/licenses/MIT
|
|
from scapy.sendrecv import AsyncSniffer
|
|
from scapy.sendrecv import sendp
|
|
from scapy.layers.dot11 import Dot11
|
|
from scapy.layers.dot11 import RadioTap
|
|
from scapy.layers.dot11 import Dot11FCS
|
|
import threading
|
|
from random import choice
|
|
from time import sleep
|
|
import os
|
|
from .dataframe import CtigerDataFrame
|
|
from .netdev import NetDev
|
|
|
|
|
|
class Purge(object):
|
|
|
|
def __getitem__(self, pkt):
|
|
return pkt
|
|
|
|
def do_hop(self) -> None:
|
|
self.log.debug('Hop args: {0}'.format(
|
|
self.do_hop.__code__.co_varnames))
|
|
thread = threading.current_thread()
|
|
self.log.debug(f'Do Hop: name={thread.name}, daemon={thread.daemon}')
|
|
while True:
|
|
ichan = choice(self.chans)
|
|
self.log.debug('Hopping on: {0}'.format(ichan))
|
|
os.system(f'iw dev {self.mon_if} set channel {str(ichan)}')
|
|
self.log.debug('Channel set to {0}'.format(ichan))
|
|
sleep(14.7)
|
|
|
|
def channel_runner(self, log) -> None:
|
|
log.info('Channel Runner NG started.')
|
|
log.info('Preliminary channel list: {0}'.format(self.channels))
|
|
chanlist = self.channels.split(',')
|
|
chlist = list(set(chanlist))
|
|
chans = [int(chan) for chan in chlist]
|
|
self.chans = chans
|
|
log.info('Channel list: {0}'.format(chlist))
|
|
thread = threading.current_thread()
|
|
log.debug(
|
|
f'Channel Runner: name={thread.name}, daemon={thread.daemon}')
|
|
chop = threading.Thread(target=self.do_hop,
|
|
name='chop')
|
|
chop.start()
|
|
|
|
def send_pkt(self, bssid) -> None:
|
|
self.log.debug('Extracted bssid: {0}'.format(bssid))
|
|
durid = 65535
|
|
self.log.debug('Selected durid: {0}'.format(durid))
|
|
self.log.debug('Sending packet with: {0}'.format(self.macaddr))
|
|
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
|
|
addr1=bssid,
|
|
addr2=self.macaddr,
|
|
ID=durid)
|
|
self.log.debug(
|
|
'Sending RTS frame to {0} with type 1 and subtype 11'.format(bssid))
|
|
sendp(new_pkt, verbose=0)
|
|
|
|
def get_interface(self) -> tuple:
|
|
ndev = NetDev()
|
|
if_tup = ndev.start_monitor(interface=self.interface,
|
|
mon_type=self.mon_type,
|
|
log=self.log)
|
|
return if_tup
|
|
|
|
def extract_channel(self, layers):
|
|
retval = ''
|
|
counter = 0
|
|
|
|
try:
|
|
while True:
|
|
layer = layers[counter]
|
|
if hasattr(layer, "ID") and layer.ID == 3 and layer.len == 1:
|
|
retval = ord(layer.info)
|
|
break
|
|
else:
|
|
counter += 1
|
|
except IndexError:
|
|
pass
|
|
|
|
return retval
|
|
|
|
def cts_prn(self, pkt):
|
|
bssid = pkt[Dot11FCS].addr2
|
|
self.log.info('Intercepted CTS from {0}'.format(bssid))
|
|
dbm_signal = pkt.dBm_AntSignal
|
|
pkt_chan = self.extract_channel(pkt[Dot11])
|
|
self.log.debug('Extracted channel: {0}'.format(pkt_chan))
|
|
if bssid in self.devices:
|
|
self.log.info('Duplicate CTS from {0}'.format(bssid))
|
|
return
|
|
else:
|
|
self.devices.append(bssid)
|
|
self.scan_df.loc[bssid] = [
|
|
self.macaddr, dbm_signal, pkt_chan, 'N/A']
|
|
self.scan_df.to_csv(self.valid_file, mode='a', header=False)
|
|
self.log.info('Results written to {0}'.format(self.valid_file))
|
|
|
|
def probe_prn(self, pkt):
|
|
bssid = pkt[Dot11FCS].addr2
|
|
self.log.info('Intercepted probe-req from {0}'.format(bssid))
|
|
self.log.debug('Extracted bssid: {0}'.format(bssid))
|
|
self.log.info('Sending RTS frame to {0}'.format(bssid))
|
|
self.send_pkt(bssid)
|
|
|
|
def mac_revealer(self, interface, mon_type, valid_file, channels, log):
|
|
self.interface = interface
|
|
self.mon_type = mon_type
|
|
self.valid_file = valid_file
|
|
self.channels = channels
|
|
self.log = log
|
|
df = CtigerDataFrame()
|
|
log.info('mac revealer started')
|
|
log.info('setting up class attributes')
|
|
self.scan_df = df.get_df()
|
|
devices = df.load_df(valid_file, log)
|
|
if devices:
|
|
self.devices = devices
|
|
else:
|
|
log.info('No valid targets found')
|
|
self.devices = []
|
|
log.info('acquired Dataframe')
|
|
mon_if, macaddr = self.get_interface()
|
|
self.mon_if = mon_if
|
|
self.macaddr = macaddr
|
|
log.debug('mon_if: {0}'.format(mon_if))
|
|
log.debug('return type: {0}'.format(type(mon_if)))
|
|
log.info('interface {0} is up and running.'.format(self.mon_if))
|
|
probe_sniff = AsyncSniffer(
|
|
iface=mon_if, prn=self.probe_prn,
|
|
filter="type mgt subtype probe-req",
|
|
monitor=True)
|
|
probe_sniff.start()
|
|
cts_sniff = AsyncSniffer(filter='type ctl subtype cts',
|
|
iface=mon_if, prn=self.cts_prn,
|
|
monitor=True)
|
|
cts_sniff.start()
|
|
self.channel_runner(log)
|
|
log.info('Channel runner started.')
|
|
log.info('Probe sniffer started')
|
|
log.info('CTS sniffer started')
|