new method of CTS vector, nicely done

This commit is contained in:
anoduck 2023-10-24 17:47:37 -04:00
parent 1d485bc890
commit 5da571edf0

123
ctiger.py
View file

@ -366,9 +366,9 @@ class Purge(object):
print(f'name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(chans)
log.debug('Hopping on: {0}'.format(ichan))
# log.debug('Hopping on: {0}'.format(ichan))
os.system(f'iw dev {self.mon_if} set channel {str(ichan)}')
log.debug('Channel set to {0}'.format(ichan))
# log.debug('Channel set to {0}'.format(ichan))
sleep(14.7)
async def mac_purge(self, interface, mon_type, valid_file, channels):
@ -405,14 +405,19 @@ class Purge(object):
def send_pkt(self, bssid) -> None:
self.bssid = bssid
idval = [16383, 26370, 32767, 65535]
durid = choice(idval)
log.debug('Extracted bssid: {0}'.format(bssid))
# idval = [16383, 32767, 65535]
# durid = choice(idval)
durid = 65535
log.debug('Selected durid: {0}'.format(durid))
log.debug('Sending packet with: {0}'.format(macaddr))
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
addr1=bssid,
addr2=macaddr,
ID=durid)
log.debug('Sending RTS frame to {0} with type 11'.format(bssid))
sendp(new_pkt, timeout=3, verbose=0, retry=0, threaded=True)
log.debug('Sending RTS frame to {0} with type 1 and subtype 11'.format(bssid))
sendp(new_pkt, verbose=0)
return
async def get_interface(self, interface, mon_type) -> str:
self.interface = interface
@ -422,52 +427,21 @@ class Purge(object):
mon_type=self.mon_type)
return mon_if
async def random_chan(self, channels):
self.channels = channels
chlist = self.channels.split(',')
chans = [int(chan) for chan in chlist]
ichan = choice(chans)
return ichan
async def change_chan(self, mon_if, ichan):
self.mon_if = mon_if
self.ichan = ichan
log.debug('Monitor interface: {0}'.format(self.mon_if))
log.debug('New Channel: {0}'.format(self.ichan))
# iw [options] dev <devname> set channel <channel>
change_it = os.system(
'iw dev ' + self.mon_if + ' set channel ' + self.ichan
)
if change_it:
current_channel = self.ichan
return current_channel
def probe_proc(self, probe_pkts):
ptup_list = []
for ppkt in probe_pkts:
dbm_signal = ppkt.dBm_AntSignal
self.dbm_signal = dbm_signal
bssid = ppkt[Dot11FCS].addr2
self.bssid = bssid
ptuple = (bssid, dbm_signal)
ptup_list.append(ptuple)
punified = list(set(ptup_list))
return punified
def cts_prn(self, pkt):
log.info('Intercepted CTS from {0}'.format(self.bssid))
dbm_signal = pkt.dBm_AntSignal
ichan = extract_channel(pkt[Dot11])
scan_df.loc[self.bssid] = [macaddr, dbm_signal, ichan, 'N/A']
scan_df.to_csv(valid_file, mode='a')
pkt_chan = extract_channel(pkt[Dot11])
log.debug('Extracted channel: {0}'.format(pkt_chan))
scan_df.loc[self.bssid] = [macaddr, dbm_signal,
pkt_chan, 'N/A']
scan_df.to_csv(self.valid_file, mode='a')
log.info('Results written to {0}'.format(valid_file))
async def chan_timer(self):
await asyncio.sleep(14.7)
log.info('Channel timer done.')
def probe_prn(self, pkt):
bssid = pkt[Dot11FCS].addr2
self.bssid = bssid
log.info('Intercepted probe-req from {0}'.format(bssid))
log.debug('Extracted bssid: {0}'.format(bssid))
log.info('Sending RTS frame to {0}'.format(bssid))
self.send_pkt(bssid)
return
@ -492,48 +466,19 @@ class Purge(object):
global chopper
chopper = asyncio.create_task(chop)
log.info('Channel runner started.')
while True:
probe_sniff = AsyncSniffer(
iface=mon_if, prn=self.probe_prn,
filter="type mgt subtype probe-req",
monitor=True)
probe_sniff.start()
log.info('Probe sniffer started')
# presult = await self.probe_proc(probe_sniff.results)
# log.info('Processing results from probe sniffer.')
# bssid = presult[0]
# dbm_signal = presult[1]
await asyncio.sleep(0)
# await self.send_pkt(bssid)
# log.info('Sending RTS frame to {0}'.format(bssid))
# await asyncio.sleep(0)
cts_sniff = AsyncSniffer(filter='type ctl subtype cts',
iface=mon_if, prn=self.cts_prn,
monitor=True)
cts_sniff.start()
log.info('CTS sniffer started')
await asyncio.sleep(0)
# valid_cts = await self.cts_proc(cts_sniff.results)
# log.info('Validating CTS packet.')
# if valid_cts:
# scan_df.loc[bssid] = [macaddr, dbm_signal, ichan, 'N/A']
# scan_df.to_csv(valid_file, mode='a')
# log.info('Results written to {0}'.format(valid_file))
# else:
# log.info('Unable to validate CTS packet.')
# await asyncio.sleep(0)
# alltasks = asyncio.all_tasks()
# current_task = asyncio.current_task()
# alltasks.remove(asyncio.current_task)
# await asyncio.wait(alltasks)
# ttimer = asyncio.create_task(self.chan_timer())
# await ttimer
# log.info('Beginning channel hopping.')
# ichan = await self.random_chan(self.channels)
# self.ichan = ichan
# await asyncio.create_task(self.change_chan(self.mon_if, self.ichan))
# await asyncio.sleep(0)
# continue
probe_sniff = AsyncSniffer(
iface=mon_if, prn=self.probe_prn,
filter="type mgt subtype probe-req",
monitor=True)
probe_sniff.start()
log.info('Probe sniffer started')
await asyncio.sleep(0)
cts_sniff = AsyncSniffer(filter='type ctl subtype cts',
iface=mon_if, prn=self.cts_prn,
monitor=True)
cts_sniff.start()
log.info('CTS sniffer started')
await asyncio.sleep(0)
def start_purge(self) -> None:
signal.signal(signal.SIGINT, signal_handler)
@ -757,7 +702,7 @@ def get_df():
return scan_df
def signal_handler(signal, frame) -> None:
def signal_handler(signal=signal.SIGINT, frame=None) -> None:
print('You pressed Ctrl+C!')
log.info('Shutting down')
log.info('Going Down!!')
@ -766,7 +711,7 @@ def signal_handler(signal, frame) -> None:
def get_log(log_file, log_level):
logfile = os.path.abspath(log_file)
log = logging.getLogger('scapy.runtime')
log = logging.getLogger(__name__)
if log.hasHandlers():
log.handlers.clear()
# lvl = str(f'logging.{log_level}')