feat(Features): 🚧 defining dhcp functionality

defining much of dhcp and feature functionality

Avoid using Hidden Dragon
This commit is contained in:
anoduck 2024-03-16 03:52:56 -04:00
parent f2c940ae37
commit 0ad9bdda8e
42 changed files with 1956 additions and 374 deletions

View file

@ -1,4 +1,187 @@
n.n.n / 2024-03-12
==================
* feat(Features): :construction: Hidden Dragon Work
* feat(Features): :triangular_flag_on_post: Begin development of Hidden Dragon
v0.4.3 / 2024-02-12
===================
* setver: set version to 0.4.3
* added to changelog
0.4.3 / 2024-02-12
==================
* style(Structure): :art: Big push, Modulization works. Bug Repairs.
* refactor(Structure): :art: More modularization
* refactor(Structure): :art: Begin Modularization
v0.4.2 / 2024-02-10
===================
* setver: set version to 0.4.2
* perf: :art: Minor Performance improvements, code cleanup, no more double entries
v0.0.5 / 2024-01-29
===================
* setver: set version to 0.0.5
* setting up versioning
* feat: :sparkles: Duplicate entry prevention
* refactor: :recycle: Massive refactoring
0.4.1 / 2024-01-26
==================
* no error, but no run either
* cleaning up purge class from redundants
* Repaired more mon_if functions...again
* Repaired more mon_if functions
* fixed mon_dev startup for attack module
* fixed thread execution channel hopper
* fixed timer context
* renamed chan_hop to do_hop
* corrected log file path & fixed threading context managers
* added context to threads
0.4.0 / 2024-01-23
==================
* refactor: :art: 0.4.0 : Code Cleanup, remove async
1.1.3 / 2023-12-09
==================
* breaking down into modules
* asyncio swapped for trio
* cleaning up readme
* Delete clients.csv
* Delete clients.txt
* Delete essid.txt
* Delete aps.txt
* Delete probe-iterator.py
* Delete ssids.txt
* Delete APs.csv
* added to readme
* new method of CTS vector, nicely done
* before big cleaning of new method
* Merge pull request 'classified' (#3) from classified into master
* Merge branch 'master' into classified
* corrected transport layer
* cleaned up ignore file
* Merge pull request 'classified -> master' (#2) from classified into master
* Whew, what a bitch...milestone done
* corrected log formatting
* testing
* a few hiccups
* significant improvement in class formation
* significant improvement in class formation
* Commiting to classified_dev
* added pcap to gitignore
* Working with new way to parse args for classes
* Merge pull request 'classified' (#1) from classified into master
* Stops on keyboard interrupt, like it should.
* works, be needs to be cleaned up and further classified
* Working on classes
* like a top
* better run configuration
* corrected cts frame
* sniffers both async
* added bit about unique mac.
* new configuration for macpurger stops after first sent packet.
* playing with packet intervals
* finished up and readme
* Simplification of entire process
* Cleaning up
* wrapping up cts vector
* Works, but runs forever
* Testing CTS vector
* It Works!
* returned to more sloppy code;
* should be running round robin
* stopping for the night
* fixed daemon flag
* just errrggg
* removed faker-wifi-essid dependency
* added changing of mac addresses
* logging setup, daemonizing resolved, poetry is no longer required
* added ability to daemonize
* added to readme
* no error output, time for testing
* corrected spelling of system
* troubleshooting and debugging
* wrapping up attack mode
* creation attack mode
* working on vector
* added to gitignore
* formulating attack vector
* finally receiving output from scapy.sniff()
* Hardware issues discovered
* running tests
* configured configobj
* correcting more .gitignore
* correcting .gitignore
* mods to sniff
* might be running
* invalid interface error present
* Scapy-Scan-untested
0.1.2 / 2023-10-15
==================
* Stops on keyboard interrupt, like it should.
* works, be needs to be cleaned up and further classified
* Working on classes
* like a top
* better run configuration
* corrected cts frame
* sniffers both async
* added bit about unique mac.
* new configuration for macpurger stops after first sent packet.
* playing with packet intervals
* finished up and readme
* Simplification of entire process
* Cleaning up
* wrapping up cts vector
* Works, but runs forever
* Testing CTS vector
* It Works!
* returned to more sloppy code;
roundrobin / 2023-10-03
=======================
* should be running round robin
* stopping for the night
* fixed daemon flag
* just errrggg
* removed faker-wifi-essid dependency
* added changing of mac addresses
* logging setup, daemonizing resolved, poetry is no longer required
* added ability to daemonize
* added to readme
* no error output, time for testing
* corrected spelling of system
* troubleshooting and debugging
* wrapping up attack mode
* creation attack mode
* working on vector
* added to gitignore
* formulating attack vector
* finally receiving output from scapy.sniff()
* Hardware issues discovered
* running tests
* configured configobj
* correcting more .gitignore
* correcting .gitignore
* mods to sniff
* might be running
* invalid interface error present
* Scapy-Scan-untested
* underway
n.n.n / 2024-02-12
==================

View file

@ -8,10 +8,46 @@
#+EXPORT_SELECT_TAGS: EXPORT
#+EXPORT_EXCLUDE_TAGS: noexport
# ---------------------------------------
* Tasks
** dhcp
**** TODO Rework logging of clients in database
* Changelog
** unreleased
***
*** 2024.03.16
- Continued to flush out much of database integration.
- configured initial connection to db
- reworking DB operations for dhcp.
- created function to add time to creation of leases
- created function to add expiration to leases
- Mapped out modules for proxy and portal
*** 2024.03.15
- Updated Hidden Dragon init files to reflect changes and Structure
- Cleaned up unresolved references in dhcp.__main__
- Massive refactoring of dhcp service
- created function for time
- created methods for db actions
- corrected database creation file.
- Opted to load dhcp dicts in database for speed
- Worked on preventing disconnection from remote hosts.
*** 2024.03.14
- Changed mk_inf to interfaces to cover all interfaces
- Forked out HDragon into two modules; network & accesspoint
- created additional files to help map out functionality of HDragon
- I just learned the same functionality can be acquired through sockets and a lib than as scapy.
- created method for creating a dns database for the dns server implementation
- Mostly done with DNS implementation
- Broke down dhcp into classes and modules
*** 2024.03.12
- Committed changes and modified Changelog
- Added imports to __init__
- Added Beacon class to ap
- Created client dataframe to keep track of clients
- Tried to modify activity.plantuml
- split routing into separate module
- Considering creating timing module
- working on processing of net_database
*** 2024.03.11
**** :construction: Hidden Dragon Work
- Using standard csv to rearrange network database
- Created bare knuckle dhcp server
- Added function for dhcp to assign more than one IP address.

View file

@ -3,9 +3,12 @@
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from ctiger.attack import Attack
from ctiger.mac_purge import Purge
from ctiger.proclog import ProcLog
from ctiger.dataframe import CtigerDataFrame
from ctiger.netdev import NetDev
from ctiger.__version__ import version
from .__main__ import ProcArgs
from .attack import Attack
from .mac_purge import Purge
from .proclog import ProcLog
from .dataframe import CtigerDataFrame
from .netdev import NetDev
from .hdragon import Dragon
from .spec import cfg
from .__version__ import version

View file

@ -10,52 +10,12 @@ from .proclog import ProcLog
from .mac_purge import Purge
from .attack import Attack
from .hdragon import Dragon
from .spec import cfg
from semver import Version
from .__version__ import version as _version
config_file = os.path.abspath("/etc/ctiger/config.ini")
cfg = """# Crouching Tiger Config File
# PLEASE, DO NOT LEAVE THIS FILE UNTOUCHED!
# TARGETS WILL HAVE TO BE MODIFIED IN ORDER TO WORK!
# ------------------------------------------------------------
# General Settings
# ----------------
interface = string(default='wlan0')
logging_level = option('INFO', 'DEBUG', default='DEBUG')
log_file = string(default='/var/log/ctiger.log')
# ------------------------------------------------------------
# Attack Settings
# ----------------
[ATTACK]
scan_file = string(default='ct_aps.csv')
mon_type = option('create', 'switch', default='switch')
network_file = string(default='local_networks.csv')
use_daemon = boolean(default=False)
# -----------------------------------------------------------
# Mac Purge Settings
# -----------------
[MAC_PURGE]
if_type = option('create', 'switch', default='switch')
valid_results = string(default='ct_valid.csv')
channel_list = list(default=list(1, 6, 11))
# -----------------------------------------------------------
# Hidden Dragon Settings
# -----------------------
[DRAGON]
ap_iface = string(default='mon0')
netdb_file = string(default='net_database.csv')
dragon_results = string(default='dragon_results.csv')
# -----------------------------------------------------------
"""
class ProcArgs:
@ -99,7 +59,7 @@ class ProcArgs:
dragon.hidden_dragon(ap_iface=args.ap_iface,
netdb_file=args.netdb_file,
results=args.results,
log=log)
config=args.config, log=log)
case _:
ap.print_help()
@ -217,6 +177,7 @@ class ProcArgs:
help='You must use one.')
# attack Subcommands
dest='config'
att_parse = subparse.add_parser('att', help='Attack target')
att_parse.add_argument('-s', '--scan_file', dest='scan_file',
default=config['ATTACK']['scan_file'],
@ -258,6 +219,8 @@ class ProcArgs:
hd_parse.add_argument('-r', '--results', dest='results',
default=config['DRAGON']['dragon_results'],
help='File to write results too.')
hd_parse.add_argument('-c', '--config' , dest='config',
default=config_path, help='Config file')
##################
# parse the args #

View file

@ -47,7 +47,7 @@ class Dragon:
|___/
""")
def hidden_dragon(self, ap_iface, netdb_file, results, log):
def hidden_dragon(self, ap_iface, netdb_file, results, config, log):
log.info('Starting Hidden Dragon')
netdb_file = os.path.abspath(netdb_file)
print("""
@ -55,11 +55,12 @@ class Dragon:
ap_iface: {0}
ap_db: {1}
results: {2}
config_file: {3}
Have a good day, and thanks for all the fish!
""".format(ap_iface, netdb_file, results))
""".format(ap_iface, netdb_file, results, config))
dataframe = CtigerDataFrame()
apnetwork_df = dataframe.load_apnetworks(netdb_file, log)
results_df = dataframe.gen_resdf(log)
HD = HiddenDragon()
HD.start(ap_iface=ap_iface, net_db=apnetwork_df,
results=results_df, log=log)
results=results_df, config=config, log=log)

View file

@ -3,3 +3,20 @@
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from ..proclog import ProcLog
from __main__ import HiddenDragon
from .data import apData
from .utils import Utils
from .db import HDDB
from .time import DragonTime
from .network.__main__ import HDNetwork
from .network.tun import TunIf
from .network.wlan import WlanIF
from .network.route import RouteTraffic
from .access_point.__main__ import WifiAP
from .access_point.eap import EAPHandler
from .access_point.callbacks import Callbacks
from .access_point.dhcp.__main__ import APDHCP
from .access_point.dhcp.constants import *
from .access_point.dhcp.functions import *
from .access_point.dhcp.packet import DhcpPacket

View file

@ -3,8 +3,11 @@
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from threading import Thread
from configobj import ConfigObj
from random import choice
from .data import apData
from .mk_inf import TunIf
from .network.tun import TunIf
from .network.wlan import WlanIF
from ctiger.dataframe import CtigerDataFrame
@ -12,8 +15,26 @@ class HiddenDragon(Thread):
def __init__(self):
Thread.__init__(self)
def start(self, ap_iface, net_db, log):
def begin_id(self, net_db, log):
index_ID = choice(net_db.index)
net_srs = net_db.loc[index_ID]
net_name = net_srs.name
net_mac = net_srs.NetID
net_crypt = net_srs.Encryption
net_chan = net_srs.Channel
def start(self, ap_iface, net_db, results, config, log):
config = ConfigObj(config)
self.db_name = config['DRAGON.DB']['db_name']
self.db_user = config['DRAGON.DB']['db_user']
self.db_pass = config['DRAGON.DB']['db_pass']
db = HDDB(self.db_name, self.db_user, self.db_pass)
db.init()
DF = CtigerDataFrame()
resdf = DF.gen_resdf(log)
iface = TunIf(ap=ap_iface, apData=apData, log=log)
tun = TunIf(ap=ap_iface, apData=apData, log=log)
wlan = WlanIF(ap=ap_iface, apData=apData, log=log)
tun.start()
wlan.start()
pass

View file

@ -0,0 +1,13 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from .__main__ import WifiAP
from .callbacks import Callbacks
from .dns.__main__ import apDNS
from .eap import EAPHandler
from .dhcp.__main__ import APDHCP
from .dhcp.constants import *
from .dhcp.functions import *
from .dhcp.packet import DhcpPacket

View file

@ -0,0 +1,58 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import subprocess
import threading
from scapy.all import sniff
from time import time, sleep
from scapy.layers.dot11 import RadioTap, conf as scapyconf
from scapy.layers.inet import TCP
from hidden_dragon.data import apData
from random import choice
class WifiAP:
"""
Functionality concerning characteristics of the access point
Such as:
- SSID
- MAC
- IP
- Channel
- Crypto
"""
def __init__(self, ap_iface, net_db, log):
self.ap_iface = ap_iface
self.net_db = net_db
self.log = log
self.running = True
self.daemon = True
self.interval = 0.1
self.start()
class FakeBeacon(threading.Thread):
"""Fake Beacon Transmitter"""
def __init__(self, ap_iface, net_db, log):
super().__init__()
self.ap_iface = ap_iface
self.net_db = net_db
self.log = log
self.running = True
self.daemon = True
self.interval = 0.1
self.start()
def start(self):
while True:
for ssid in self.ap_iface.ssids:
self.ap_iface.callbacks.cb_dot11_beacon(ssid)
# Sleep
sleep(self.interval)
def create_ap(self):
if not apData.AP_HIDE:
self.FakeBeacon(self, self.net_db, self.log)

View file

@ -0,0 +1,306 @@
from .eap import *
from rpyutils import *
from .constants import *
from scapy.layers.dot11 import *
from scapy.layers.dhcp import *
from scapy.layers.dns import DNS
from scapy.layers.inet import TCP, UDP
class Callbacks(object):
def __init__(self, ap):
self.ap = ap
self.cb_recv_pkt = self.recv_pkt
self.cb_dot11_probe_req = self.dot11_probe_resp
self.cb_dot11_beacon = self.dot11_beacon
self.cb_dot11_auth = self.dot11_auth
self.cb_dot11_ack = self.dot11_ack
self.cb_dot11_assoc_req = self.dot11_assoc_resp
self.cb_dot11_rts = self.dot11_cts
self.cb_dot1X_eap_req = self.dot1x_eap_resp
self.cb_arp_req = self.arp_resp
self.cb_dhcp_discover = self.dot11_to_tint
self.cb_dhcp_request = self.dot11_to_tint
self.cb_dns_request = self.dot11_to_tint
self.cb_other_request = self.dot11_to_tint
self.cb_tint_read = self.recv_pkt_tint
def recv_pkt(self, packet):
try:
if len(packet.notdecoded[8:9]) > 0: # Driver sent radiotap header flags
# This means it doesn't drop packets with a bad FCS itself
flags = ord(packet.notdecoded[8:9])
if flags & 64 != 0: # BAD_FCS flag is set
# Print a warning if we haven't already discovered this MAC
if not packet.addr2 is None:
printd("Dropping corrupt packet from %s" % packet.addr2, Level.BLOAT)
# Drop this packet
return
# Management
if packet.type == DOT11_TYPE_MANAGEMENT:
if packet.subtype == DOT11_SUBTYPE_PROBE_REQ: # Probe request
if Dot11Elt in packet:
ssid = packet[Dot11Elt].info
printd("Probe request for SSID %s by MAC %s" % (ssid, packet.addr2), Level.DEBUG)
# Only send a probe response if one of our own SSIDs is probed
if ssid in self.ap.ssids or (Dot11Elt in packet and packet[Dot11Elt].len == 0):
if not (self.ap.hidden and ssid != self.ap.get_ssid()):
self.cb_dot11_probe_req(packet.addr2, self.ap.get_ssid())
elif packet.subtype == DOT11_SUBTYPE_AUTH_REQ: # Authentication
if packet.addr1 == self.ap.mac: # We are the receivers
self.ap.sc = -1 # Reset sequence number
self.cb_dot11_auth(packet.addr2)
elif packet.subtype == DOT11_SUBTYPE_ASSOC_REQ or packet.subtype == DOT11_SUBTYPE_REASSOC_REQ:
if packet.addr1 == self.ap.mac:
self.cb_dot11_assoc_req(packet.addr2, packet.subtype)
# After association, start EAP session if enabled
if self.ap.ieee8021x:
self.cb_dot1X_eap_req(packet.addr2, EAPCode.REQUEST, EAPType.IDENTITY, None)
# Data packet
if packet.type == DOT11_TYPE_DATA:
if EAPOL in packet:
if packet.addr1 == self.ap.mac:
# EAPOL Start
if packet[EAPOL].type == 0x01:
self.ap.eap.reset_id()
self.dot1x_eap_resp(packet.addr2, EAPCode.REQUEST, EAPType.IDENTITY, None)
if EAP in packet:
if packet[EAP].code == EAPCode.RESPONSE: # Responses
if packet[EAP].type == EAPType.IDENTITY:
identity = str(packet[Raw])
if packet.addr1 == self.ap.mac:
# EAP Identity Response
printd("Got identity: " + identity[0:len(identity) - 4], Level.INFO)
# Send auth method LEAP
self.dot1x_eap_resp(packet.addr2, EAPCode.REQUEST, EAPType.EAP_LEAP, "\x01\x00\x08" + "\x00\x00\x00\x00\x00\x00\x00\x00" + str(identity[0:len(identity) - 4]))
if packet[EAP].type == EAPType.NAK: # NAK
method = str(packet[Raw])
method = method[0:len(method) - 4]
method = ord(method.strip("x\\"))
printd("NAK suggested method " + EAPType.convert_type(method), Level.INFO)
elif ARP in packet:
if packet[ARP].pdst == self.ap.ip.split('/')[0]:
self.cb_arp_req(packet.addr2, packet[ARP].psrc)
elif DHCP in packet:
if packet.addr1 == self.ap.mac:
if packet[DHCP].options[0][1] == 1:
self.cb_dhcp_discover(packet)
if packet[DHCP].options[0][1] == 3:
self.cb_dhcp_request(packet)
elif DNS in packet:
self.cb_dns_request(packet)
elif IP in packet:
self.cb_other_request(packet)
except Exception as err:
print("Unknown error at monitor interface: %s" % repr(err))
def recv_pkt_tint(self, packet):
try:
packet = IP(packet) # We expect an IP packet from the external interface
if BOOTP in packet:
client_mac = bytes_to_mac(packet[BOOTP].chaddr[:6])
if DHCP in packet and packet[DHCP].options[0][1] == 5: # DHCP ACK
client_ip = packet[BOOTP].yiaddr
self.ap.arp.add_entry(client_ip, client_mac)
printd("IP %s -> %s" % (client_ip, client_mac), Level.INFO)
# Forward our message to the client
self.dot11_encapsulate_ip(client_mac, packet)
else:
client_ip = packet[IP].dst
self.dot11_encapsulate_ip(self.ap.arp.get_entry(client_ip), packet)
except Exception as err:
print("Unknown error at tun interface: %s" % repr(err))
def dot11_probe_resp(self, source, ssid):
probe_response_packet = self.ap.get_radiotap_header() \
/ Dot11(subtype=5, addr1=source, addr2=self.ap.mac, addr3=self.ap.mac, SC=self.ap.next_sc()) \
/ Dot11ProbeResp(timestamp=self.ap.current_timestamp(), beacon_interval=0x0064, cap=0x2104) \
/ Dot11Elt(ID='SSID', info=ssid) \
/ Dot11Elt(ID='Rates', info=AP_RATES) \
/ Dot11Elt(ID='DSset', info=chr(self.ap.channel))
# If we are an RSN network, add RSN data to response
if self.ap.ieee8021x:
probe_response_packet[Dot11ProbeResp].cap = 0x3101
rsn_info = Dot11Elt(ID='RSNinfo', info=RSN)
probe_response_packet = probe_response_packet / rsn_info
sendp(probe_response_packet, iface=self.ap.interface, verbose=False)
def dot11_beacon(self, ssid):
# Create beacon packet
beacon_packet = self.ap.get_radiotap_header() \
/ Dot11(subtype=8, addr1='ff:ff:ff:ff:ff:ff', addr2=self.ap.mac, addr3=self.ap.mac) \
/ Dot11Beacon(cap=0x2105) \
/ Dot11Elt(ID='SSID', info=ssid) \
/ Dot11Elt(ID='Rates', info=AP_RATES) \
/ Dot11Elt(ID='DSset', info=chr(self.ap.channel))
if self.ap.ieee8021x:
beacon_packet[Dot11Beacon].cap = 0x3101
rsn_info = Dot11Elt(ID='RSNinfo', info=RSN)
beacon_packet = beacon_packet / rsn_info
# Update sequence number
beacon_packet.SC = self.ap.next_sc()
# Update timestamp
beacon_packet[Dot11Beacon].timestamp = self.ap.current_timestamp()
# Send
sendp(beacon_packet, iface=self.ap.interface, verbose=False)
def dot11_auth(self, receiver):
auth_packet = self.ap.get_radiotap_header() \
/ Dot11(subtype=0x0B, addr1=receiver, addr2=self.ap.mac, addr3=self.ap.mac, SC=self.ap.next_sc()) \
/ Dot11Auth(seqnum=0x02)
printd("Sending Authentication (0x0B)...", Level.DEBUG)
sendp(auth_packet, iface=self.ap.interface, verbose=False)
def dot11_ack(self, receiver):
ack_packet = self.ap.get_radiotap_header() \
/ Dot11(type='Control', subtype=0x1D, addr1=receiver)
print("Sending ACK (0x1D) to %s ..." % receiver)
sendp(ack_packet, iface=self.ap.interface, verbose=False)
def dot11_assoc_resp(self, receiver, reassoc):
response_subtype = 0x01
if reassoc == 0x02:
response_subtype = 0x03
assoc_packet = self.ap.get_radiotap_header() \
/ Dot11(subtype=response_subtype, addr1=receiver, addr2=self.ap.mac, addr3=self.ap.mac, SC=self.ap.next_sc()) \
/ Dot11AssoResp(cap=0x2104, status=0, AID=self.ap.next_aid()) \
/ Dot11Elt(ID='Rates', info=AP_RATES)
printd("Sending Association Response (0x01)...", Level.DEBUG)
sendp(assoc_packet, iface=self.ap.interface, verbose=False)
def dot11_cts(self, receiver):
cts_packet = self.ap.get_radiotap_header() \
/ Dot11(ID=0x99, type='Control', subtype=12, addr1=receiver, addr2=self.ap.mac, SC=self.ap.next_sc())
printd("Sending CTS (0x0C)...", Level.DEBUG)
sendp(cts_packet, iface=self.ap.interface, verbose=False)
def arp_resp(self, receiver_mac, receiver_ip):
arp_packet = self.ap.get_radiotap_header() \
/ Dot11(type="Data", subtype=0, addr1=receiver_mac, addr2=self.ap.mac, addr3=self.ap.mac, SC=self.ap.next_sc(), FCfield='from-DS') \
/ LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) \
/ SNAP(OUI=0x000000, code=ETH_P_ARP) \
/ ARP(psrc=self.ap.ip.split('/')[0], pdst=receiver_ip, op="is-at", hwsrc=self.ap.mac, hwdst=receiver_mac)class ARPHandler():
def __init__(self):
self.mutex = threading.Lock()
self.arp_table = {}
def add_entry(self, client_ip, client_mac):
self.mutex.acquire()
if client_ip not in self.arp_table:
self.arp_table[client_ip] = client_mac
self.mutex.release()
def get_entry(self, client_ip):
self.mutex.acquire()
try:
temp = self.arp_table[client_ip]
except KeyError:
temp = None
printd("Could not find IP %s in ARP table." % client_ip, Level.WARNING)
self.mutex.release()
return temp
printd("Sending ARP Response...", Level.DEBUG)
sendp(arp_packet, iface=self.ap.interface, verbose=False)
def dot1x_eap_resp(self, receiver, eap_code, eap_type, eap_data):
eap_packet = self.ap.get_radiotap_header() \
/ Dot11(type="Data", subtype=0, addr1=receiver, addr2=self.ap.mac, addr3=self.ap.mac, SC=self.ap.next_sc(), FCfield='from-DS') \
/ LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) \
/ SNAP(OUI=0x000000, code=0x888e) \
/ EAPOL(version=1, type=0) \
/ EAP(code=eap_code, id=self.ap.eap.next_id(), type=eap_type)
if not eap_data is None:
eap_packet = eap_packet / str(eap_data)
printd("Sending EAP Packet (code = %d, type = %d, data = %s)..." % (eap_code, eap_type, eap_data), Level.DEBUG)
sendp(eap_packet, iface=self.ap.interface, verbose=False)
def unspecified_raw(self, raw_data):
raw_packet = str(raw_data)
printd("Sending RAW packet...", Level.DEBUG)
sendp(raw_packet, iface=self.ap.interface, verbose=False)
def dhcp_offer(self, client_mac, client_ip, xid):
dhcp_offer_packet = self.ap.get_radiotap_header() \
/ Dot11(type="Data", subtype=0, addr1="ff:ff:ff:ff:ff:ff", addr2=self.ap.mac, SC=self.ap.next_sc(), FCfield='from-DS') \
/ LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) \
/ SNAP(OUI=0x000000, code=ETH_P_IP) \
/ IP(src=self.ap.ip, dst=client_ip) \
/ UDP(sport=67, dport=68) \
/ BOOTP(op=2, yiaddr=client_ip, siaddr=self.ap.ip, giaddr=self.ap.ip, chaddr=mac_to_bytes(client_mac), xid=xid) \
/ DHCP(options=[('message-type', 'offer')]) \
/ DHCP(options=[('subnet_mask', '255.255.255.0')]) \
/ DHCP(options=[('server_id', self.ap.ip), 'end'])
sendp(dhcp_offer_packet, iface=self.ap.interface, verbose=False)
def dhcp_ack(self, client_mac, client_ip, xid):
dhcp_ack_packet = self.ap.get_radiotap_header() \
/ Dot11(type="Data", subtype=0, addr1="ff:ff:ff:ff:ff:ff", addr2=self.ap.mac, SC=self.ap.next_sc(), FCfield='from-DS') \
/ LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) \
/ SNAP(OUI=0x000000, code=ETH_P_IP) \
/ IP(src=self.ap.ip, dst=client_ip) \
/ UDP(sport=67,dport=68) \
/ BOOTP(op=2, yiaddr=client_ip, siaddr=self.ap.ip, giaddr=self.ap.ip, chaddr=mac_to_bytes(client_mac), xid=xid) \
/ DHCP(options=[('message-type','ack')]) \
/ DHCP(options=[('server_id', self.ap.ip)]) \
/ DHCP(options=[('lease_time', 43200)]) \
/ DHCP(options=[('subnet_mask', '255.255.255.0')]) \
/ DHCP(options=[('router', self.ap.ip)]) \
/ DHCP(options=[('name_server', DEFAULT_DNS_SERVER)]) \
/ DHCP(options=[('domain', "localdomain")]) \
/ DHCP(options=['end'])
sendp(dhcp_ack_packet, iface=self.ap.interface, verbose=False)
def dot11_encapsulate_ip(self, client_mac, ip_packet):
if IP in ip_packet:
del ip_packet[IP].chksum
del ip_packet[IP].len
else:
raise Exception("Attempted to encapsulate non-IP packet.")
if UDP in ip_packet:
del ip_packet[UDP].len
response_packet = self.ap.get_radiotap_header() \
/ Dot11(type="Data", subtype=0, addr1=client_mac, addr2=self.ap.mac, SC=self.ap.next_sc(), FCfield='from-DS') \
/ LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) \
/ SNAP(OUI=0x000000, code=ETH_P_IP) \
/ str(ip_packet)
sendp(response_packet, iface=self.ap.interface, verbose=False)
def dot11_to_tint(self, pkt):
self.ap.tint.write(pkt) # Pass to third party application for handling

View file

@ -0,0 +1,11 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from .__main__ import APDHCP
from .constants import *
from .functions import Encode, DhcpOptions, DhcpOption
from .packet import DhcpPacket

View file

@ -0,0 +1,144 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import socket
from .constants import DhcpConstants
from .packet import DhcpPacket
from hidden_dragon.db import HDDB
from time import time
import psycopg
import re
class APDHCP:
class DhcpAck(DhcpPacket):
def __init__(self, packet=None, **kwargs):
super(self.__class__, APDHCP.DhcpAck).__init__(packet=packet)
self.message_type = DhcpConstants.DHCP_OFFER
class Lease(object):
def __init__(self, ip=None, mac=None):
self.ip = ip
self.mac = mac
self.lease_time = time.time()
class Leases(object):
def __init__(self, subnet, start, end):
self.leases = []
self.subnet = subnet
self.start = start
self.end = end
for i in range(self.start, self.end):
ip = '%s.%s' % (self.subnet, i)
# Below is an odd line as it defines an object that is unused.
lease = APDHCP.Lease(ip=ip)
def add(self, mac, ip=None):
lease = APDHCP.Lease(mac=mac, ip=ip)
self.leases.append(lease)
def find_mac(self, mac):
for lease in self.leases:
if lease.mac == mac:
return lease
return None
def find_ip(self, ip):
for lease in self.leases:
if lease.ip == ip:
return lease
return None
class DhcpServer(object):
def __init__(self, interface, ip, subnet, start,
end, mac_addresses, leases, log):
self.port = 67
self.interface = interface
self.ip = ip
self.subnet = subnet
self.start = start
self.end = end
self.macs = mac_addresses
self.leases = leases
self.log = log
self.sock = None
self.create_lease_db()
self.bind()
def create_lease_db(self):
if not self.subnet:
self.subnet = self.ip
m = re.search(r'(?P<subnet>\d+.\d+.\d+)', self.subnet)
if not m:
print('invalid subnet')
self.leases = None
m = m.groupdict()
self.leases = APDHCP.Leases(m['subnet'], int(self.start), int(self.end))
def bind(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# 25 here corresponds to SO_BINDTODEVICE, which is not exposed by Python (presumably for portability reasons).
self.sock.setsockopt(socket.SOL_SOCKET, 25, self.interface)
self.sock.bind((self.ip, self.port))
def ack(self, dhcp):
ack = dhcp
ack.message_type = DhcpConstants.DHCP_ACK
def nak(self, dhcp):
nak = dhcp
nak.message_type = DhcpConstants.DHCP_NAK
def offer(self, dhcp):
offer = dhcp
offer.message_type = DhcpConstants.DHCP_OFFER
offer.hardware_type = 1
offer.hardware_address_length = 6
offer.hops = 0
offer.seconds_elapsed = 0
offer.boot_flags = '8000'
offer.client_ip = '0.0.0.0'
offer.your_ip = '172.16.1.72'
offer.next_server_ip = '172.16.1.1'
offer.relay_agent_ip = '172.16.1.99'
# options
offer.option.operation = DhcpConstants.DHCP_OFFER
offer.option.lease_time = 86400
offer.option.renewal_time = 43200
offer.option.rebinding_time = 75600
offer.option.subnet_mask = '255.255.255.0'
offer.option.broadcast_address = '10.1.1.255'
offer.option.dns = '10.1.1.1'
offer.option.domain_name = 'localdomain'
self.log.info(offer.to_string())
self.sock.sendto(offer.encode(), ('<broadcast>', 68))
def start_server(self, log):
log.info('dhcp server started')
while True:
data = self.sock.recv(4096)
dhcp = DhcpPacket(data)
if dhcp.message_type == DhcpConstants.DHCP_DISCOVER:
self.log.info('sending offer')
self.offer(dhcp)
if dhcp.message_type == DhcpConstants.DHCP_REQUEST:
self.log.info('sending ack')
self.ack(dhcp)

View file

@ -0,0 +1,63 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from dataclasses import dataclass
@dataclass
class DhcpConstants:
DHCP_DISCOVER: int = 1
DHCP_OFFER: int = 2
DHCP_REQUEST: int = 3
DHCP_DECLINE: int = 4
DHCP_ACK: int = 5
DHCP_NAK: int = 6
DHCP_RELEASE: int = 7
DHCP_INFORM: int = 8
@dataclass
class DhcpTypes:
INT = '_int'
HEX = '_hex'
IP = '_ip'
MAC = '_mac'
STR = '_str'
DHCP_MAGIC_COOKIE = 0x63825363
DHCP_FIELDS = [
{'id': 'op', 'name': 'message_type', 'length': 1, 'type': INT},
{'id': 'htype', 'name': 'hardware_type', 'length': 1, 'type': INT},
{'id': 'hlen', 'name': 'hardware_address_length', 'length': 1, 'type': INT},
{'id': 'hops', 'name': 'hops', 'length': 1, 'type': INT},
{'id': 'xid', 'name': 'transaction_id', 'length': 4, 'type': HEX},
{'id': 'secs', 'name': 'seconds_elapsed', 'length': 2, 'type': INT},
{'id': 'flags', 'name': 'boot_flags', 'length': 2, 'type': HEX},
{'id': 'ciaddr', 'name': 'client_ip', 'length': 4, 'type': IP},
{'id': 'yiaddr', 'name': 'your_ip', 'length': 4, 'type': IP},
{'id': 'siaddr', 'name': 'next_server_ip', 'length': 4, 'type': IP},
{'id': 'giaddr', 'name': 'relay_agent_ip', 'length': 4, 'type': IP},
{'id': 'chaddr', 'name': 'client_mac', 'length': 16, 'type': MAC},
{'id': 'sname', 'name': 'server_hostname', 'length': 64, 'type': STR},
{'id': 'filename', 'name': 'boot_filename', 'length': 128, 'type': STR},
{'id': 'magic', 'name': 'magic_cookie', 'length': 4, 'type': HEX},
]
DHCP_OPTIONS = [
{'id': 0, 'name': 'padding', 'length': 1, 'type': HEX},
{'id': 1, 'name': 'subnet_mask', 'length': 4, 'type': IP},
{'id': 3, 'name': 'router', 'length': 4, 'type': IP},
{'id': 6, 'name': 'dns', 'length': 4, 'type': IP},
{'id': 12, 'name': 'hostname', 'length': 1, 'type': STR},
{'id': 15, 'name': 'domain_name', 'length': 1, 'type': STR},
{'id': 28, 'name': 'broadcast_address', 'length': 4, 'type': IP},
{'id': 51, 'name': 'lease_time', 'length': 4, 'type': INT},
{'id': 58, 'name': 'renewal_time', 'length': 4, 'type': INT},
{'id': 59, 'name': 'rebinding_time', 'length': 4, 'type': INT},
{'id': 53, 'name': 'operation', 'length': 1, 'type': INT},
{'id': 54, 'name': 'server_id', 'length': 4, 'type': IP},
{'id': 55, 'name': 'parameter_request', 'length': 0, 'type': HEX},
{'id': 61, 'name': 'client_id', 'length': 6, 'type': MAC},
]

View file

@ -0,0 +1,28 @@
DROP TABLE IF EXISTS leases;
CREATE TABLE IF NOT EXISTS leases (
id SERIAL PRIMARY KEY,
mac MACADDR NOT NULL,
net CIDR NOT NULL,
ip INET NOT NULL,
assigned TIMESTAMP NOT NULL,
expire TIMESTAMP NOT NULL,
UNIQUE(mac, ip)
)
DROP TABLE IF EXISTS dhcp_fields;
CREATE TYPE field_type AS ENUM ('INT', 'IP', 'MAC', 'STR', 'HEX', 'TLV', 'IP');
CREATE TABLE IF NOT EXISTS dhcp_fields (
key SERIAL PRIMARY KEY,
id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
length INT NOT NULL,
UNIQUE(id, name)
)
DROP TABLE IF EXISTS dhcp_options;
CREATE TYPE option_type AS ENUM ('HEX', 'IP', 'STR', 'MAC', 'INT');
CREATE TABLE IF NOT EXISTS dhcp_options (
key SERIAL PRIMARY KEY,
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
length INT NOT NULL,
UNIQUE(id, name)
)

View file

@ -0,0 +1,79 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
class DHCPDB:
"""
DROP TABLE IF EXISTS leases;
CREATE TABLE IF NOT EXISTS leases (
id SERIAL PRIMARY KEY,
mac MACADDR NOT NULL,
ip INET NOT NULL,
assigned TIMESTAMP NOT NULL,
expire TIMESTAMP NOT NULL,
UNIQUE(mac, ip)
)
DROP TABLE IF EXISTS dhcp_fields;
CREATE TYPE field_type AS ENUM ('INT', 'IP', 'MAC', 'STR', 'HEX', 'TLV', 'IP');
CREATE TABLE IF NOT EXISTS dhcp_fields (
key SERIAL PRIMARY KEY,
id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
length INT NOT NULL,
UNIQUE(id, name)
)
DROP TABLE IF EXISTS dhcp_options;
CREATE TYPE option_type AS ENUM ('HEX', 'IP', 'STR', 'MAC', 'INT');
CREATE TABLE IF NOT EXISTS dhcp_options (
key SERIAL PRIMARY KEY,
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
length INT NOT NULL,
UNIQUE(id, name)
)
"""
def __init__(self, conn, cur):
self.conn = conn
self.cur = cur
if 'dhcp' not in self.cur.getquoted():
self.cur.execute(
open(
'ctiger/hidden_dragon/access_point/dhcp.sql', 'r'
).read())
self.conn.commit()
# mac CHAR(17) NOT NULL,
# ip INET NOT NULL,
# expire INT NOT NULL,
def insert(self, mac, ip, expire):
sql = """INSERT INTO leases (mac, ip, assigned, expire) VALUES (%s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + INTERVAL '8 hours')"""
return self.conn.execute(self.conn, sql, (mac, ip, expire))
def select_all(self, conn): return select(
conn, """SELECT mac, ip, assigned, expire FROM hosts""")
def select_ip(self, conn, ip): return self.conn.select(
conn, """SELECT * FROM hosts WHERE ip = %s""", (ip,))
def select_mac(self, conn, mac): return self.conn.select(
conn, """SELECT * FROM hosts WHERE mac = %s""", (mac,))
def select_mac_recordtype(self, conn, mac, ip): return self.conn.select(
conn, """SELECT * FROM hosts WHERE mac = %s AND ip = %s""", (mac, ip))
# return curs.fetchone()
def drop_row_id(self, conn, row_id): return self.conn.execute(
conn, """DELETE FROM hosts WHERE id = %s""", (row_id,))
def drop_row_mac(self, conn, mac): return self.conn.execute(
conn, """DELETE FROM hosts WHERE mac = %s""", (mac,))
def drop_row_mac_recordtype(self, conn, mac, ip):
return self.conn.execute(conn,
"""DELETE FROM hosts WHERE mac = %s AND ip = %s""", (mac, ip))
def drop_row_ip(self, conn, expire): return self.conn.execute(
conn, """DELETE FROM hosts WHERE expire = %s""", (expire,))

View file

@ -0,0 +1,143 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import socket
import struct
from .constants import DHCP_OPTIONS, DhcpTypes
class Encode(object):
def __init__(self, value):
self.value = value
def _int(self):
return format(self.value, '0%sx' % (2 * self.length))
def _ip(self):
return socket.inet_aton(self.value).encode('hex')
def _mac(self):
return self.value.replace(':', '').lower()
https://hclips.com/videos/8901749/ntb-ir-bgvntr-gorgeous-hotwife-at-a-bbc-party/?fr=1
def _str(self):
return ''.join(x.encode('hex') for x in self.value)
def _hex(self):
return self.value
def _tlv_encode(self, t, l, v):
return format(t, '02x') + format(l, '02x') + v
class DhcpOption(object):
def __init__(self, value=None, decode=None, id=None, name=None):
self._value = value
self._decode = decode
self.name = name
self.id = id
self.length = None
self.type = None
option = None
if id:
option = self.find_id(id)
else:
option = self.find_name(name)
if option:
for k, v in option.items():
setattr(self, k, v)
@property
def value(self):
f = getattr(self, self.type)
return f()
def _int(self):
value = format(self._value, '0%sx' % (2 * self.length))
return self._tlv_encode(self.id, self.length, value)
def _ip(self):
value = socket.inet_aton(self._value).encode('hex')
return self._tlv_encode(self.id, self.length, value)
def _mac(self):
value = value.replace(':', '').lower()
return self._tlv_encode(self.id, self.length, value)
def _str(self):
value = ''.join(x.encode('hex') for x in self._value)
return self._tlv_encode(self.id, len(value) / 2, value)
def _hex(self):
return self._value
def _tlv_encode(self, t, l, v):
return format(t, '02x') + format(l, '02x') + v
def find_id(self, id):
for option in DHCP_OPTIONS:
if id == option.get('id'):
return option
def find_name(self, name):
for option in DHCP_OPTIONS:
if name == option.get('name'):
return option
class DhcpOptions(object):
def __init__(self, options=None, **kwargs):
if options:
self._decode(options)
for k, v in kwargs.items():
setattr(self, k, v)
@property
def data(self):
return self._encode()
def _encode(self):
data = ''
for option in DHCP_OPTIONS:
value = getattr(self, option['name'], None)
if value:
data += DhcpOption(id=option['id'], value=value).value
data += 'ff'
return data
def _decode(self, tlv):
options = {}
while (tlv):
[t] = struct.unpack('B', tlv[0])
option = DhcpOption(id=t)
name = option.name or 'padding'
if name == 'end':
break
if name == 'padding':
tlv = tlv[1:]
continue
[length] = struct.unpack('B', tlv[1])
value = tlv[2:2 + length]
tlv = tlv[2 + length:]
if options.get(name, None):
options[name].append(value)
else:
options[name] = [value]
for k, v in options.items():
setattr(self, k, v)

View file

@ -0,0 +1,90 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import struct
import socket
from binascii import hexlify
from .constants import DHCP_MAGIC_COOKIE, DHCP_FIELDS
from .functions import DhcpOptions
from .functions import Encode
class DhcpPacket(object):
def __init__(self, packet=None, **kwargs):
self.message_type = None
options = None
self._defaults()
if packet:
packet = hexlify(packet)
fields = packet[0:480]
options = packet[480:]
self._decode_fields(fields)
self.option = DhcpOptions(options=options)
def _defaults(self):
self.hardware_type = 1
self.hardware_address_length = 6
self.hops = 0
self.seconds_elapsed = 0
self.boot_flags = '8000'
self.client_ip = '0.0.0.0'
self.your_ip = '0.0.0.0'
self.next_server_ip = '0.0.0.0'
self.relay_agent_ip = '0.0.0.0'
self.magic_cookie = DHCP_MAGIC_COOKIE
def _decode_fields(self, data):
for option in DHCP_FIELDS:
l = option['length']
f = getattr(self, '%s_decode' % (option['type']))
value = f(data[:l * 2])
data = data[l * 2:]
setattr(self, option['name'], value)
def _encode_fields(self):
data = ''
for option in DHCP_FIELDS:
value = getattr(self, option['name'], None)
l = option['length']
f = getattr(self, '%s_encode' % (option['type']))
encoded = f(value, option['length'])
data += f(value, option['length'])
return data
def to_string(self):
return self._encode_fields() + self.option.data
def encode(self):
packet = self.to_string()
encoded = packet.decode('hex')
return encoded
def _int_encode(self, value, length):
value = value or 0
return format(int(value), '0%sx' % (2 * length))
def _int_decode(self, value):
return int(value, 16)
def _ip_encode(self, value, length=8):
value = value or '0.0.0.0'
return socket.inet_aton(value).encode('hex')
def _ip_decode(self, value):
return socket.inet_ntoa(value.decode('hex'))
def _mac_encode(self, value, length):
value = value or '00:00:00:00:00:00'
value = value.replace(':', '').lower()
return value.ljust(2 * length, '0')
def _mac_decode(self, value):
value = value.lower()
return ':'.join(a + b for a, b in zip(value[::2], value[1::2]))

View file

@ -0,0 +1,6 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from .__main__ import apDNS

View file

@ -0,0 +1,105 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from dnslib import QTYPE, DNSRecord, DNSHeader, select_host
import threading
import socket
from hidden_dragon.data import apData
class apDNS:
def __init__(self):
pass
# Function to handle DNS queries and return a response
class DNSAM:
def __init__(self, qname, qtype):
self.qname = qname
self.qtype = qtype
def get_name(self, name):
if not name.endswith('.'): name = f'{name}.'
return name
def get_record_type(self, record_type):
if record_type == 'A': return QTYPE.A
elif record_type == 'NS': return QTYPE.NS
elif record_type == 'AAAA': return QTYPE.AAAA
elif record_type == 'MX': return QTYPE.MX
return None
def handle_dns_query(conn, data, upstream_server, upstream_port):
request = DNSRecord.parse(data)
reply = DNSRecord(DNSHeader(id=request.header.id,
qr=1, aa=1, ra=1), q=request.q)
qname = str(request.q.qname)
qtype = request.q.qtype
# if qtype == QTYPE.NS: qt = 'NS'
# elif qtype == QTYPE.A: qt = 'A'
# else:
# a = request.send(upstream_server, upstream_port, tcp=False, timeout=10)
# request.add_answer(a)
# return request.pack()
print(f'A qname: {qname}, qtype: {qtype}')
res = select_hostname_recordtype(conn, qname, qtype)
print(f'B res {type(res)}: {res}')
assert isinstance(res, list)
if not res:
a = request.send(upstream_server, upstream_port, tcp=False, timeout=10)
return a
res = res[0]
print(f'C res {type(res)}: {res}')
assert isinstance(res, tuple)
if not res:
a = request.send(upstream_server, upstream_port, tcp=False, timeout=10)
return a
res = res[3]
print(f'D res {type(res)}: {res}')
assert isinstance(res, str)
if not res:
a = request.send(upstream_server, upstream_port, tcp=False, timeout=10)
return a
# res = res[3]
# if not res:
# a = request.send(upstream_server, upstream_port, tcp=False, timeout=10)
# return a
print(f'E qname: {qname}, qtype: {qtype}, res: {res}')
# if qname in dns_records and qtype in dns_records[qname]:
# if res:
if qtype == QTYPE.NS:
reply.add_answer(RR(rname=qname, rtype=qtype, rdata=NS(res)))
else:
reply.add_answer(RR(rname=qname, rtype=qtype, rdata=A(res)))
# else:
# # TODO
# #reply.add_answer(RR(rname=qname, rtype=qtype, rdata=A('0.0.0.0')))
# #q = DNSRecord(q=DNSQuestion(qname))
# a = reply.send(upstream_server, upstream_port, tcp=False, timeout=10)
# reply.add_answer(a)
return reply.pack()
# Function to start the DNS server and listen for requests
def start_server(conn, host='', port=53, upstream_server='8.8.8.8', upstream_port=53):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind((host, port))
print(f'DNS server listening on port {port}... \n \n')
while True:
try:
data, address = server_socket.recvfrom(1024)
response = handle_dns_query(conn, data, upstream_server, upstream_port)
server_socket.sendto(response, address)
except:
pass

View file

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS hosts;
CREATE TYPE record_type AS ENUM ('A', 'AAAA', 'CNAME', 'MX', 'NS', 'PTR', 'SOA', 'TXT');
CREATE TABLE IF NOT EXISTS hosts (
id SERIAL PRIMARY KEY,
hostname VARCHAR(255) NOT NULL,
record INT NOT NULL,
ip INET NOT NULL,
UNIQUE(hostname, record)
)

View file

@ -0,0 +1,45 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
class Dns_DB:
def __init__(self, conn, cur):
self.conn = conn
self.cur = cur
if 'dns' not in self.cur.getquoted():
self.cur.execute(
open(
'ctiger/hidden_dragon/access_point/dns.sql', 'r'
).read())
self.conn.commit()
def insert(self, hostname, record, ip):
sql = """INSERT INTO dns (hostname, record, ip) VALUES (%s, %s, %s)"""
return self.conn.execute(self.conn, sql, (hostname, record, ip))
def select_all(self, conn): return select(
conn, """SELECT hostname, record, ip FROM hosts""")
def select_ip(self, conn, ip): return self.conn.select(
conn, """SELECT * FROM hosts WHERE ip = %s""", (ip,))
def select_hostname(self, conn, hostname): return self.conn.select(
conn, """SELECT * FROM hosts WHERE hostname = %s""", (hostname,))
def select_hostname_recordtype(self, conn, hostname, record): return self.conn.select(
conn, """SELECT * FROM hosts WHERE hostname = %s AND record = %s""", (hostname, record))
# return curs.fetchone()
def drop_row_id(self, conn, row_id): return self.conn.execute(
conn, """DELETE FROM hosts WHERE id = %s""", (row_id,))
def drop_row_hostname(self, conn, hostname): return self.conn.execute(
conn, """DELETE FROM hosts WHERE hostname = %s""", (hostname,))
def drop_row_hostname_recordtype(self, conn, hostname, record):
return self.conn.execute(conn,
"""DELETE FROM hosts WHERE hostname = %s AND record = %s""", (hostname,))
def drop_row_ip(self, conn, ip): return self.conn.execute(
conn, """DELETE FROM hosts WHERE ip = %s""", (ip,))

View file

@ -1,16 +0,0 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import subprocess
from scapy.all import sniff
from time import time, sleep
from scapy.layers.dot11 import RadioTap, conf as scapyconf
from scapy.layers.inet import TCP
class wifiAP:
def pass_proxy(self, ip, port):
pass

View file

@ -1,60 +0,0 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import threading
from scapy.layers.inet import ARP
from scapy.sendrecv import AsyncSniffer
from scapy.sendrecv import sendp
from scapy.utils import PcapWriter
from ctiger.dataframe import CtigerDataFrame
from ctiger.netdev import NetDev
from ctiger.proclog import ProcLog
class Arp(NetDev):
def __init__(self, args):
super().__init__(args)
self.arp = ARP()
self.arp.op = 2
self.arp.hwdst = "ff:ff:ff:ff:ff:ff"
def send(self, pkt):
sendp(pkt, iface=self.interface, verbose=False)
def arp_handler(self, pkt):
self.arp_handler.handle_arp(pkt)
def sniff(self, stop_filter, iface):
AsyncSniffer(
stop_filter=stop_filter,
filter="arp",
iface=iface,
prn=self.arp_handler,
store=0,
timeout=0.01)
class ARPHandler():
def __init__(self):
self.mutex = threading.Lock()
self.arp_table = {}
def add_entry(self, client_ip, client_mac):
self.mutex.acquire()
if client_ip not in self.arp_table:
self.arp_table[client_ip] = client_mac
self.mutex.release()
def get_entry(self, client_ip):
self.mutex.acquire()
try:
temp = self.arp_table[client_ip]
except KeyError:
temp = None
printd("Could not find IP %s in ARP table." %
client_ip, Level.WARNING)
self.mutex.release()
return temp

View file

@ -50,6 +50,7 @@ class apData:
DHCP_MIN: str = "10.0.2.2"
DHCP_MAX: str = "10.0.2.62"
DHCP_EXPIRY: int = 28800
AP_HIDE: bool = False
@dataclass
class EAPCode:

View file

@ -0,0 +1,48 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import psycopg
import subprocess
class HDDB(subprocess.Popen):
def __init__(self, db_name, db_user, db_pass):
"""
Initialize the database connection with the given database name, username, and password.
Parameters:
db_name (str): The name of the database.
db_user (str): The username for the database.
db_pass (str): The password for the database.
Returns:
None
"""
self.db_name = db_name
self.db_user = db_user
self.db_pass = db_pass
self.conn = None
self.cur = None
self.state = 0
def open(self):
self.conn = psycopg.connect(
database=self.db_name, user=self.db_user, password=self.db_pass)
self.state = 1
return self.conn
def get_cursor(self):
if self.state == 0:
self.open()
if self.cur is None:
self.cur = self.conn.cursor()
return self.cur
def close(self):
if self.cur is not None:
self.cur.close()
if self.conn is not None:
self.conn.close()
self.state = 0

View file

@ -1,89 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
"""# DHCP Server specifically for wifi
1. Craft DHCP Packets: Use Scapy to craft DHCP packets for the various stages of the DHCP process,
including Discover, Offer, Request, Acknowledgement, and more as needed. Scapy provides a
flexible way to build and manipulate network packets, including DHCP packets.
2. Bind a Socket: Create a raw socket to send and receive UDP packets using Python's socket module.
This allows you to send and receive DHCP messages on the network.
server_mac
3. Listen for DHCP Requests: Listen for DHCP client requests (DHCP Discover) on the network using
the raw socket. When a DHCP Discover packet is received, parse the packet using Scapy to extract
relevant information such as the client's MAC address and requested parameters.
4. Craft DHCP Offer: Upon receiving a DHCP Discover, craft a DHCP Offer packet using Scapy to
allocate an available IP address and other configuration parameters to the client. Send the DHCP
Offer packet using the raw socket to the client's broadcast address.
5. Handle DHCP Request: If the client sends a DHCP Request for the offered parameters, process the
request and send a DHCP Acknowledgement (ACK) packet to finalize the configuration.
6. Error Handling and Lease Management: Implement error handling for invalid requests and manage IP
address lease allocation to ensure that IP addresses are properly assigned and reclaimed.
"""
# from scapy.all import *
from scapy.layers.dhcp import DHCP
from scapy.layers.inet import IP, UDP
from scapy.layers.dot11 import Dot11
from scapy.layers.dhcp import BOOTP
from scapy.sendrecv import sniff, sendp
import subprocess
from random import choice
import time
import socket
import fcntl
import struct
from ctiger.hidden_dragon import ap
from data import apData
# Define the DHCP server IP and the network ifname to listen on
server_ip = apData.IP_ADDRESS # "10.0.2.1"
ifname = "wlan0"
class apDHCP:
def __init__(self) -> None:
self.server_ip = apData.IP_ADDRESS
self.netmask = apData.NETMASK
self.broadcast = apData.NET_BROADCAST
self.host_min = apData.DHCP_MIN
self.host_max = apData.DHCP_MAX
self.ip_net = apData.IP_NETWORK
def get_if_hwaddr(self, ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15]))
return ''.join(['%02x' % ord(char) for char in info[18:24]])
# Create a DHCP packet handler
def handle_dhcp_packet(self, packet):
if DHCP in packet:
# Check if the packet is a DHCP Discover
if packet[DHCP].options[0][1] == 1:
# Craft DHCP Offer
min_host = self.host_min.split('.')[3]
max_host = self.host_max.split('.')[3]
cli_int = choice(range(int(min_host), int(max_host)))
client_ip = self.ip_net + '.' + str(cli_int)
dhcp_offer = Dot11(src=self.server_mac, dst="ff:ff:ff:ff:ff:ff") / IP(src=server_ip, dst="self.broadcast") / UDP(sport=67, dport=68) / BOOTP(op=2, yiaddr=client_ip, siaddr=server_ip, chaddr=packet[Dot11].src) / DHCP(options=[("message-type", "offer"), ("subnet_mask", self.netmask), "end"])
sendp(dhcp_offer, iface=ifname, verbose=0)
print("Sent DHCP Offer to", packet[Dot11].src)
# Wait for DHCP Request
dhcp_request = sniff(iface=ifname, filter="udp and (port 67 or 68)", count=1)
if DHCP in dhcp_request[0]:
# Craft DHCP Acknowledgement
dhcp_ack = Dot11(src=self.server_mac, dst=dhcp_request[0][Dot11].src) / IP(src=server_ip, dst="self.broadcast") / UDP(sport=67, dport=68) / BOOTP(op=2, yiaddr=client_ip, siaddr=server_ip, chaddr=dhcp_request[0][Dot11].src) / DHCP(options=[("message-type", "ack"), ("subnet_mask", self.netmask), "end"])
sendp(dhcp_ack, iface=ifname, verbose=0)
print("Sent DHCP Acknowledgement to", dhcp_request[0][Dot11].src)
def start_dhcp(self):
self.server_mac = self.get_if_hwaddr(ifname)
# Start sniffing DHCP traffic
sniff(iface=ifname, filter="udp and (port 67 or 68)", prn=self.handle_dhcp_packet)

View file

@ -1,148 +0,0 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import fcntl
import struct
import os
import threading
import subprocess
from scapy.arch import str2mac, get_if_raw_hwaddr
from scapy.layers.inet import IP
from .data import apData
from warnings import warn
class TunIf(threading.Thread):
def __init__(self, ap, apData, log, name="fakeap"):
threading.Thread.__init__(self)
self.apData = apData
self.log = log
self.dev = apData.dev
if len(name) > apData.IFNAMSIZ:
raise Exception(
"Tun interface name cannot be larger than " + str(
apData.IFNAMSIZ))
self.name = name
self.setDaemon(True)
self.ap = ap
# Virtual interface
self.fd = open('/dev/net/tun', 'r+b')
ifr_flags = apData.IFF_TUN | apData.IFF_NO_PI # Tun device without packet information
ifreq = struct.pack('16sH', name, ifr_flags)
fcntl.ioctl(self.fd, apData.TUNSETIFF, ifreq) # Syscall to create interface
# Assign IP and bring interface up
# set_ip_address(name, self.ap.ip)
# def set_ip_address(dev, ip):
if subprocess.call(['ip', 'addr', 'add', apData.ip, 'dev', self.dev]):
warn("Failed to assign IP address {} to {}.".format(apData.ip,
self.dev))
if subprocess.call(['ip', 'link', 'set', 'dev', self.dev, 'up']):
warn("Failed to bring device {} up.".format(self.dev))
log.info("Created TUN interface {} at {}".format(self.name, self.ap.ip))
def write(self, pkt):
os.write(self.fd.fileno(), str(pkt[IP])) # Strip layer 2
def read(self):
raw_packet = os.read(self.fd.fileno(), apData.DOT11_MTU)
return raw_packet
def close(self):
os.close(self.fd.fileno())
def run(self):
while True:
raw_packet = self.read()
self.ap.callbacks.cb_tint_read(raw_packet)
class RouteTraffic(threading.Thread):
def __init__(self, apIface, apData, log) -> None:
if subprocess.call(['iptables', '--table', 'nat', '--append',
'POSTROUTING', '--out-interface', apIface, '-j',
'MASQUERADE']):
warn("Failed to setup postrouting for {}.".format(apIface))
if subprocess.call(['iptables', '--append', 'FORWARD',
'--in-interface', apData.dev, '-j', 'ACCEPT']):
warn("Failed to setup forwarding for {}.".format(apData.dev))
if subprocess.call(['sysctl', '-w', 'net.ipv4.ip_forward=1']):
warn("Failed to enable IP forwarding.")
log.info("Created route traffic for {}".format(apIface))
pass
def set_monitor_mode(wlan_dev, enable=True):
monitor_dev = None
if enable:
result = subprocess.check_output(['airmon-ng', 'start', wlan_dev])
if not "monitor mode enabled on" in result:
printd(clr(Color.RED, "ERROR: Airmon could not enable monitor mode on device %s. Make sure you are root, and that"
"your wlan card supports monitor mode." % wlan_dev), Level.CRITICAL)
exit(1)
monitor_dev = re.search(
r"monitor mode enabled on (\w+)", result).group(1)
printd("Airmon set %s to monitor mode on %s" %
(wlan_dev, monitor_dev), Level.INFO)
else:
subprocess.check_output(['airmon-ng', 'stop', wlan_dev])
return monitor_dev
def set_ip_address(dev, ip):
if subprocess.call(['ip', 'addr', 'add', ip, 'dev', dev]):
printd("Failed to assign IP address %s to %s." %
(ip, dev), Level.CRITICAL)
if subprocess.call(['ip', 'link', 'set', 'dev', dev, 'up']):
printd("Failed to bring device %s up." % dev, Level.CRITICAL)
def clear_ip_tables():
if subprocess.call(['iptables', '--flush']):
printd("Failed to flush iptables.", Level.CRITICAL)
if subprocess.call(['iptables', '--table', 'nat', '--flush']):
printd("Failed to flush iptables NAT.", Level.CRITICAL)
if subprocess.call(['iptables', '--delete-chain']):
printd("Failed to delete iptables chain.", Level.CRITICAL)
if subprocess.call(['iptables', '--table', 'nat', '--delete-chain']):
printd("Failed to delete iptables NAT chain.", Level.CRITICAL)
def hex_offset_to_string(byte_array):
temp = byte_array.replace("\n", "")
temp = temp.replace(" ", "")
return temp.decode("hex")
def get_frequency(channel):
if channel == 14:
freq = 2484
else:
freq = 2407 + (channel * 5)
freq_string = struct.pack("<h", freq)
return freq_string
def mac_to_bytes(mac):
return ''.join(chr(int(x, 16)) for x in mac.split(':'))
def bytes_to_mac(byte_array):
return ':'.join("{:02x}".format(ord(byte)) for byte in byte_array)
# Scapy sees mon0 interface as invalid address family, so we write our own
def if_hwaddr(iff):
return str2mac(get_if_raw_hwaddr(iff)[1])

View file

@ -0,0 +1,9 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from .__main__ import HDNetwork
from .tun import TunIf
from .wlan import WlanIF
from .route import RouteTraffic

View file

@ -0,0 +1,23 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import os
from .tun import TunIf
from .wlan import WlanIF
from .route import RouteTraffic
class HDNetwork:
def create_failsafe(self):
"""
Create failsafe to prevent being disconnected from a remote host.
1. Add IP address to the bridge
2. Set the bridge to up
3. Setup a backup default route
"""
os.system('ip addr add 192.168.1.99/24 dev br0')
os.system('ip route add default via 192.168.1.1 dev br0')
os.system('ip route add 192.168.1.0/24 via 192.168.1.1 dev br0')
os.system('ufw allow from 192.168.1.0/24 to any port 22')

View file

@ -0,0 +1,5 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT

View file

@ -0,0 +1,5 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT

View file

@ -0,0 +1,50 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import subprocess
import threading
from warnings import warn
import os
class RouteTraffic(threading.Thread):
def __init__(self, apIface, apData, log) -> None:
threading.Thread.__init__(self)
self.apIface = apIface
self.apData = apData
self.log = log
self.daemon = True
self.start()
def clear_ip_tables(log):
"""Avoid calling if it can be helped."""
if subprocess.call(['iptables', '--flush']):
log.info("Failed to flush iptables.")
if subprocess.call(['iptables', '--table', 'nat', '--flush']):
log.info("Failed to flush iptables NAT.")
if subprocess.call(['iptables', '--delete-chain']):
log.info("Failed to delete iptables chain.")
if subprocess.call(['iptables', '--table', 'nat', '--delete-chain']):
log.info("Failed to delete iptables NAT chain.")
def setup_forwarding(self, apIface, apData, log):
if subprocess.call(['iptables', '--table', 'nat', '--append',
'POSTROUTING', '--out-interface', apIface, '-j',
'MASQUERADE']):
warn("Failed to setup postrouting for {}.".format(apIface))
if subprocess.call(['iptables', '--append', 'FORWARD',
'--in-interface', apData.dev, '-j', 'ACCEPT']):
warn("Failed to setup forwarding for {}.".format(apData.dev))
if subprocess.call(['sysctl', '-w', 'net.ipv4.ip_forward=1']):
warn("Failed to enable IP forwarding.")
log.info("Created route traffic for {}".format(apIface))
pass
def config_route(self, apIface):
os.system(f'ip route add default dev {apIface}')
def route_traffic(self, apIface, apData, log):
self.clear_ip_tables(log)
self.setup_forwarding(apIface, apData, log)

View file

@ -0,0 +1,65 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import fcntl
import struct
import os
import threading
import subprocess
from scapy.layers.inet import IP
from .data import apData
from warnings import warn
class TunIf(threading.Thread):
"""
Creation of tun interface for AP
"""
def __init__(self, ap, apData, log, name="fakeap"):
threading.Thread.__init__(self)
self.apData = apData
self.log = log
self.dev = apData.dev
if len(name) > apData.IFNAMSIZ:
raise Exception(
"Tun interface name cannot be larger than " + str(
apData.IFNAMSIZ))
self.name = name
self.setDaemon(True)
self.ap = ap
# Virtual interface
self.fd = open('/dev/net/tun', 'r+b')
ifr_flags = apData.IFF_TUN | apData.IFF_NO_PI # Tun device without packet information
ifreq = struct.pack('16sH', name, ifr_flags)
fcntl.ioctl(self.fd, apData.TUNSETIFF, ifreq) # Syscall to create interface
# Assign IP and bring interface up
# set_ip_address(name, self.ap.ip)
# def set_ip_address(dev, ip):
if subprocess.call(['ip', 'addr', 'add', apData.ip, 'dev', self.dev]):
warn("Failed to assign IP address {} to {}.".format(apData.ip,
self.dev))
if subprocess.call(['ip', 'link', 'set', 'dev', self.dev, 'up']):
warn("Failed to bring device {} up.".format(self.dev))
log.info("Created TUN interface {} at {}".format(self.name, self.ap.ip))
def write(self, pkt):
os.write(self.fd.fileno(), str(pkt[IP])) # Strip layer 2
def read(self):
raw_packet = os.read(self.fd.fileno(), apData.DOT11_MTU)
return raw_packet
def close(self):
os.close(self.fd.fileno())
def run(self):
while True:
raw_packet = self.read()
self.ap.callbacks.cb_tint_read(raw_packet)

View file

@ -0,0 +1,73 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import fcntl
import struct
import os
import threading
import subprocess
from scapy.layers.inet import IP
from ..data import apData
from warnings import warn
class Veth(threading.Thread):
"""
Creation of veth interface for AP
"""
def __init__(self, ap, apData, log, name="fakeap"):
threading.Thread.__init__(self)
self.apData = apData
self.log = log
self.dev = apData.dev
if len(name) > apData.IFNAMSIZ:
raise Exception(
"Tun interface name cannot be larger than " + str(
apData.IFNAMSIZ))
self.name = name
self.setDaemon(True)
self.ap = ap
# Virtual interface
self.fd = open('/dev/net/veth', 'r+b')
# Tun device without packet information
ifr_flags = apData.IFF_TUN | apData.IFF_NO_PI
ifreq = struct.pack('16sH', name, ifr_flags)
# Syscall to create interface
fcntl.ioctl(self.fd, apData.TUNSETIFF, ifreq)
# Assign IP and bring interface up
# set_ip_address(name, self.ap.ip)
# def set_ip_address(dev, ip):
if subprocess.call(['ip', 'addr', 'add', apData.ip, 'dev', self.dev]):
warn("Failed to assign IP address {} to {}.".format(apData.ip,
self.dev))
if subprocess.call(['ip', 'link', 'set', 'dev', self.dev, 'up']):
warn("Failed to bring device {} up.".format(self.dev))
log.info("Created TUN interface {} at {}".format(self.name, self.ap.ip))
def write(self, pkt):
os.write(self.fd.fileno(), str(pkt[IP])) # Strip layer 2
def read(self):
raw_packet = os.read(self.fd.fileno(), apData.DOT12_MTU)
return raw_packet
def close(self):
os.close(self.fd.fileno())
def run(self):
while True:
raw_packet = self.read()
self.ap.callbacks.cb_tint_read(raw_packet)

View file

@ -0,0 +1,81 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import fcntl
import struct
import threading
import subprocess
from ..data import apData
from scapy.layers.inet import IP
import os
from warnings import warn
class WlanIF(threading.Thread):
"""Creation of wlan interface for AP"""
def __init__(self, ap, apData, log, name="wlan"):
threading.Thread.__init__(self)
self.ap = ap
self.log = log
self.dev = apData.dev
if len(name) > apData.IFNAMSIZ:
raise Exception(
"Wlan interface name cannot be larger than " + str(
apData.IFNAMSIZ))
self.name = name
self.setDaemon(True)
# Virtual interface
# self.fd = open('/dev/net/tun', 'r+b')
# ifr_flags = apData.IFF_TAP | apData.IFF_NO_PI # TAP device without packet information
# ifreq = struct.pack('16sH', name, ifr_flags)
# fcntl.ioctl(self.fd, apData.TUNSETIFF, ifreq) # Syscall to create interface
# Assign IP and bring interface up
set_ip_address(name, self.ap.ip)
if subprocess.call(['ip', 'link', 'set', 'dev', name, 'up']):
warn("Failed to bring device {} up.".format(name))
log.info("Created TAP interface {} at {}".format(self.name, self.ap.ip))
def write(self):
pass
def read(self):
pass
def close(self):
os.close(self.fd.fileno())
def run(self):
pass
def set_monitor_mode(wlan_dev, log, enable=True):
monitor_dev = None
if enable:
result = subprocess.check_output(['airmon-ng', 'start', wlan_dev])
if not "monitor mode enabled on" in result:
log.info(clr(Color.RED, "ERROR: Airmon could not enable monitor mode on device %s. Make sure you are root, and that"
"your wlan card supports monitor mode." % wlan_dev), Level.CRITICAL)
exit(1)
monitor_dev = re.search(
r"monitor mode enabled on (\w+)", result).group(1)
log.info("Airmon set %s to monitor mode on %s" %
(wlan_dev, monitor_dev), Level.INFO)
else:
subprocess.check_output(['airmon-ng', 'stop', wlan_dev])
return monitor_dev
def set_ip_address(dev, ip, log):
if subprocess.call(['ip', 'addr', 'add', ip, 'dev', dev]):
log.info("Failed to assign IP address %s to %s." % (ip, dev))
if subprocess.call(['ip', 'link', 'set', 'dev', dev, 'up']):
log.info("Failed to bring device %s up." % dev)

View file

@ -0,0 +1,36 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import re
import time
from datetime import date, datetime, timedelta
class DragonTime:
@staticmethod
def get_time() -> str:
return time.strftime("%Y%m%d%H%M%S", gmtime())
def convert_to_timetime(self, dt):
time_obj = time.mktime(dt.timetuple()) + dt.microsecond / 1E6
return time_obj
def get_dt(self, datetime_obj):
date_num = re.findall(r'\d+', str(datetime_obj))
dti = list(map(int, date_num))
yr = dti[0]
mnt = dti[1]
day = dti[2]
hr = dti[3]
mnu = dti[4]
if dti[5]:
sec = dti[5]
else:
sec = 0
return datetime(yr, mnt, day, hr, mnu, sec)
def get_expire(self, expire_time) -> str:
exp_obj = datetime.now() + timedelta(seconds=int(expire_time))
expire = exp_obj.strftime("%Y%m%d%H%M%S")
return expire

View file

@ -0,0 +1,37 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import struct
from scapy.arch import str2mac, get_if_raw_hwaddr
class Utils:
def __init__(self, log) -> None:
self.log = log
def hex_offset_to_string(self, byte_array):
temp = byte_array.replace("\n", "")
temp = temp.replace(" ", "")
return temp.decode("hex")
def get_frequency(self, channel):
if channel == 14:
freq = 2484
else:
freq = 2407 + (channel * 5)
freq_string = struct.pack("<h", freq)
return freq_string
def mac_to_bytes(self, mac):
return ''.join(chr(int(x, 16)) for x in mac.split(':'))
def bytes_to_mac(self, byte_array):
return ':'.join("{:02x}".format(ord(byte)) for byte in byte_array)
# Scapy sees mon0 interface as invalid address family, so we write our own
def if_hwaddr(self, iff):
return str2mac(get_if_raw_hwaddr(iff)[1])

View file

@ -4,18 +4,28 @@
# https://opensource.org/licenses/MIT
import logging
import threading
from logging import handlers
from warnings import warn
import os
class ProcLog:
class ProcLog(threading.Thread):
def get_log(self, log_file, lev):
def __init__(self, log_file, lev):
self.log_file = log_file
self.lev = lev
if not os.path.exists(log_file):
open(log_file, 'a').close()
super().__init__()
self.daemon = True
self.start()
def __getitem__(self, log):
log = self.get_log()
return log
def get_log(self):
if not os.path.exists(self.log_file):
open(self.log_file, 'a').close()
log = logging.getLogger(__name__)
if log.hasHandlers():
log.handlers.clear()
@ -28,7 +38,7 @@ class ProcLog:
else:
warn('Invalid level. Defaulting to debug')
log.setLevel(logging.DEBUG)
handler = handlers.RotatingFileHandler(filename=log_file,
handler = handlers.RotatingFileHandler(filename=self.log_file,
mode='a', maxBytes=1024,
backupCount=2, encoding='utf-8',
delay=False)
@ -39,3 +49,13 @@ class ProcLog:
log.info('started motion detection')
log.info('Acquired Logger')
return log
def printd(string, level):
if VERBOSITY >= level:
print(string)
def set_debug_level(level):
global VERBOSITY
VERBOSITY = level

70
ctiger/spec.py Normal file
View file

@ -0,0 +1,70 @@
# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
cfg = """# Crouching Tiger Config File
# PLEASE, DO NOT LEAVE THIS FILE UNTOUCHED!
# TARGETS WILL HAVE TO BE MODIFIED IN ORDER TO WORK!
# ------------------------------------------------------------
# General Settings
# ----------------
interface = string(default='wlan0')
logging_level = option('INFO', 'DEBUG', default='DEBUG')
log_file = string(default='/var/log/ctiger.log')
# ------------------------------------------------------------
# Attack Settings
# ----------------
[ATTACK]
scan_file = string(default='ct_aps.csv')
mon_type = option('create', 'switch', default='switch')
network_file = string(default='local_networks.csv')
use_daemon = boolean(default=False)
# -----------------------------------------------------------
# Mac Purge Settings
# -----------------
[MAC_PURGE]
if_type = option('create', 'switch', default='switch')
valid_results = string(default='ct_valid.csv')
channel_list = list(default=list(1, 6, 11))
# -----------------------------------------------------------
# Hidden Dragon Settings
# -----------------------
[DRAGON]
ap_iface = string(default='wlan0')
bridge_iface = string(default='br0')
netdb_file = string(default='net_database.csv')
dragon_results = string(default='dragon_results.csv')
[DRAGON.DB]
db_name = string(default='hdb')
db_user = string(default='root')
db_pass = string(default='password')
[DRAGON.NETWORK]
ip_address = string(default="10.0.2.1")
netmask = string(default="255.255.255.192")
network = string(default="10.0.2")
broadcast = string(default="10.0.2.63")
dhcp_min = string(default="10.0.2.2")
dhcp_max = string(default="10.0.2.62")
dhcp_expire = integer(default=28800)
domain_name = string(default="localhost.localdomain")
ap_hide = boolean(default=False)
[DRAGON.AP]
tbtt = integer(default=240)
[DRAGON.DHCP]
lease_time = 86400
renewal_time = 43200
rebinding_time = 75600
# -----------------------------------------------------------
"""

47
poetry.lock generated
View file

@ -200,6 +200,28 @@ files = [
{file = "decli-0.6.1.tar.gz", hash = "sha256:ed88ccb947701e8e5509b7945fda56e150e2ac74a69f25d47ac85ef30ab0c0f0"},
]
[[package]]
name = "dhcplib"
version = "0.1.1"
description = "Pure-Python, spec-compliant DHCP-packet-processing library"
optional = false
python-versions = "*"
files = [
{file = "dhcplib-0.1.1.tar.gz", hash = "sha256:d37ddb62a0ee3710f3a47603449038adb5232aaa28158691984c52eb4b065c6f"},
]
[[package]]
name = "dnslib"
version = "0.9.24"
description = "Simple library to encode/decode DNS wire-format packets"
optional = false
python-versions = "*"
files = [
{file = "dnslib-0.9.24-py2-none-any.whl", hash = "sha256:4f26c55603ce9f961b84404f19ff03b3ca4a051eafb2b1e141ef9b96485467c6"},
{file = "dnslib-0.9.24-py3-none-any.whl", hash = "sha256:39327e695f871574198b76ef506d9691d762b5344e0d66f5f78fefe1df99e7fd"},
{file = "dnslib-0.9.24.tar.gz", hash = "sha256:ef167868a30d4ce7c90b921279d7ecfb986be8ebc530f3e6050a2ecb68707c76"},
]
[[package]]
name = "docstring-parser"
version = "0.15"
@ -496,6 +518,29 @@ files = [
[package.dependencies]
wcwidth = "*"
[[package]]
name = "psycopg"
version = "3.1.18"
description = "PostgreSQL database adapter for Python"
optional = false
python-versions = ">=3.7"
files = [
{file = "psycopg-3.1.18-py3-none-any.whl", hash = "sha256:4d5a0a5a8590906daa58ebd5f3cfc34091377354a1acced269dd10faf55da60e"},
{file = "psycopg-3.1.18.tar.gz", hash = "sha256:31144d3fb4c17d78094d9e579826f047d4af1da6a10427d91dfcfb6ecdf6f12b"},
]
[package.dependencies]
typing-extensions = ">=4.1"
tzdata = {version = "*", markers = "sys_platform == \"win32\""}
[package.extras]
binary = ["psycopg-binary (==3.1.18)"]
c = ["psycopg-c (==3.1.18)"]
dev = ["black (>=24.1.0)", "codespell (>=2.2)", "dnspython (>=2.1)", "flake8 (>=4.0)", "mypy (>=1.4.1)", "types-setuptools (>=57.4)", "wheel (>=0.37)"]
docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"]
pool = ["psycopg-pool"]
test = ["anyio (>=3.6.2,<4.0)", "mypy (>=1.4.1)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"]
[[package]]
name = "python-dateutil"
version = "2.8.2"
@ -728,4 +773,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "e1b7238acb38207fa682a920e5d770d3c6bfacc4967566e169c26a785bdb72ff"
content-hash = "38f0c96949c04be76883f0ef81faaaa7801af550eb93abef660a1653b82ce3d2"

View file

@ -18,6 +18,9 @@ getmac = "^0.9.4"
faker-wifi-essid = "^0.4.1"
simple-parsing = "^0.1.4"
semver = "^3.0.2"
psycopg = "^3.1.18"
dnslib = "^0.9.24"
dhcplib = "^0.1.1"
[tool.poetry.group.dev.dependencies]