Tôi là một lập trình viên. Trong thời gian rảnh, tôi có nghiên cứu một vài vấn đề liên quan tới dịch ngược và virus máy tính. Tôi phân tích biến thể đầu tiên của Win32-Sality vào năm 2011. Từ đó đến nay, dù Win32-Sality vẫn tiếp tục tồn tại và lây lan nhưng những thay đổi của dòng virus này không nhiều. Trước đây, tôi đã thử tìm hiểu và thực hiện nhiều phương pháp khác nhau để có thể lấy được các URL chứa trong Win32-Sality. Các URLs này thường tồn tại trong thời gian rất ngắn. Tôi cần một phương pháp tự động, hiệu quả và có thể sử dụng tài nguyên hạn chế để thực hiện. Các phương pháp sử dụng môi trường ảo hay sandbox rất hay và khả thi. Trong bài viết này, tôi muốn cung cấp một góc nhìn mới, một phương pháp tiếp cận khác để có thể tìm và phát hiện các URL. Tôi sử dụng Emulator.

Có thể nói, tôi là một fan của bộ ba công cụ: Reversing Trilogy: Capstone, Unicorn & Keystone. Đây là nhưng công cụ rất cơ bản để có thể thực hiện nhiều ý tưởng khác nhau của tôi trong quá trình phân tích hay dịch ngược. Nói vòng vo một chút, tôi biết ngôn ngữ Ruby của người Nhật. Những tài liệu, những thư viện đầu tiên đều là của người Nhật viết để hỗ trợ cho ngôn ngữ này. Tôi muốn tinh thần đó cũng có trong các sản phẩm của người Việt. Có thể đâu đó trong blog của tôi, tôi đã nhắc tới bộ ba Reversing Trilogy. Qua bài viết này, tôi muốn được một lần nữa giới thiệu Reversing Trilogy, một công cụ của người Việt nhưng chưa được thật sự nhiều người Việt biết tới.

Đôi nét về virus lây file

Trước tiên, tôi muốn các bạn có một hình dung cụ thể hơn về một virus lây file (file infector). Chúng ta tạm gọi một file thực thi chưa bị tấn công là host file (lưu ý, đây không phải là file /etc/hosts hay C:\Windows\System32\drivers\etc\hosts). host file (hay gọi tắt là host) là một file chương trình bình thường: Các file của hệ điều hành mà thực thi được (notepad.exe, calc.exe), các chương trình được cài đặt trong máy tính, các file cài đặt có phần mở rộng là exe hay các file trò chơi… Các file này có thể có phần mở rộng là .exe, .dll, .sys. Máy tính không thể hoạt động nếu không có một file thực thi nào. Một virus lây file khi vào máy tính và được thực thi, sẽ tìm kiếm các host và sửa file này: Chúng thêm vào host các đoạn mã của virus. Các đoạn mã này sẽ chạy virus lên khi host được kích hoạt, và sau khi virus đã chạy thành công, quyền thực thi sẽ trả về cho host để host tiếp tục thực hiện nhiệm vụ. Người dùng sẽ hoàn toàn không phát hiện ra đã có virus hoạt động. Host đã trở thành file bị lây (infected file). Khi người dùng copy những file bị lây này qua một máy tính khác (qua USB, qua email…), Virus sẽ tiếp tục lây lan. Đó là phương thức lây lan chủ yếu của virus lây file. Nếu trong một máy có nhiều file bị lây cùng thực thi, virus sẽ sử dụng các phương pháp như tạo mutex, event, … để virus chỉ thực thi một lần duy nhất.

Các chương trình Antivirus có nhiệm vụ bảo vệ máy tính có thể phát hiện ra các virus lây file này, gỡ bỏ đoạn mã độc hại và khôi phục lại file như trước khi bị lây. Tuy nhiên, vì không phải virus lây file nào cũng giống nhau, nên phương pháp gỡ bỏ sẽ rất đặc trưng cho từng virus. Người phân tích sẽ phải đọc và hiểu cách thức lây của virus để tìm phương pháp khôi phục file thích hợp. Không hề có một công thức chung cho việc này. Do vậy, Antivirus có thể dùng nhiều phương pháp phát hiện khác nhau dành cho virus lây file ( heuristics, emulator, phân tích theo hành vi — behavior analysis, …) nhưng tất cả chỉ dừng ở mức độ “phát hiện”. Muốn gỡ bỏ hoàn toàn, cần có sự phân tích cụ thể của người phân tích để đảm bảo chính xác. Phương pháp phát hiện và gỡ bỏ hiệu quả nhất cho dòng virus này vẫn là sử dụng chữ kí (signature): tức là một đoạn mã virus đặc trưng. Tuy nhiên, theo thời gian, các phương pháp sử dụng chữ kí sẽ khiến cho Antivirus ngày càng cồng kềnh và kém hiệu quả.

Để cải thiện khả năng “ẩn thân”, các nhà phát triển mã độc cũng nghĩ ra công cụ cho riêng mình: virus đa hình (polymorphic virus). Loại virus này có đặc điểm là sẽ mã hóa hầu hết các đoạn mã của virus, chỉ giải mã các đoạn mã đó trên bộ nhớ của chương trình khi thực thi. Do toàn bộ virus đã mã hóa, nên các Antivirus sẽ lựa chọn đoạn code giải mã (những phần code có thể coi là cố định) để làm signature. Các đoạn code giải mã này thường nhỏ. Tất nhiên, các nhà phát triển mã độc không dừng lại ở đây, họ tiếp tục cải tiến và phát triển virus siêu đa hình (Metamorphic code). Dòng này có đặc điểm là nó tạo ra các đoạn code giải mã khác nhau ở mỗi infected file. Cùng một biến thể, nhưng nếu lây 2 host, sẽ cho ra 2 đoạn code giải mã khác nhau. Có nhiều phương pháp khác nhau để làm việc này, nhưng một trong các phương pháp phổ biến là đưa các đoạn code “rác” vào code giải mã, để làm rối Antivirus. Hiện nay có khá nhiều project cho để biến đổi giống như thế này, bạn có thể thử xem qua: metame hay pymetamorph.

Phân tích Win32-Sality và phương pháp tìm các URL độc hại

Phân tích kĩ Win32-Sality, có thể thấy rằng đây là Metamorphic virus. Điều đó có nghĩa là, với cùng một mẫu virus và cùng một file bị lây, nhưng mỗi lần lây sẽ tạo ra các đoạn mã hoàn toàn khác nhau. Cần phải giải mã được toàn bộ virus để tìm được các URL được lưu trong virus. Để vượt qua các đoạn mã đa hình, tôi sử dụng emulator. Tôi dùng Unicorn-engine để chạy giả lập lại toàn bộ quá trình chạy của virus. Vì thế tôi không cần đọc hiểu thuật toán giải mã của chương trình, nhưng vẫn có thể giải mã chính xác. Win32-Sality có “đem” theo một DLL, đó là engine lây file. Tiếc rằng DLL này lại được pack bằng UPX. Đến bước này tôi sẽ phải dùng emulator một lần nữa để unpack và tìm các URL bên trong đó. Trong script, tôi có sử dụng pefile để parse nội dung của một file thực thi.

Các bước thực hiện như sau:

  • sử dụng pefile để parse file PE.
  • sử dụng unicorn-engine làm emulator để thực thi các đoạn mã giải mã
  • dùng mem_map của unicorn để copy toàn bộ file PE vào memory của emulator
  • tạo một vùng nhớ (khoảng 0x4000 — con số này tôi tự chọn) làm stack
  • thiết lập thanh ghi stack, do chương trình cần sử dụng stack trong khi thực thi. Các thanh ghi khác chỉ cần để mặc định
  • cài đặt một hàm callback để trace các lệnh đã được thực thi
  • bắt đầu chạy emulator từ entrypoint của chương trình

Trong quá trình phân tích Win32-Sality, tôi nhận thấy: sau khi thực hiện lệnh retn trong quá trình giải mã, EIP sẽ trỏ về đoạn code thật. Tuy nhiên, lệnh retn có thể xuất hiện nhiều chỗ khác nhau trong chương trình. Để tránh phát hiện nhầm, tôi có lấy 1 đoạn làm chữ kí cho Sality (hàm check_sality) để biết được đã giải mã xong hay chưa.

Sau khi giải mã thành công toàn bộ đoạn code gốc của sality, chúng ta lưu ý: DLL chứa engine lây file của sality lại được pack bằng UPX:

  • Sử dụng pefile để parse file
  • sử dụng unicorn-engine làm emulator để thực thi các đoạn mã giải mã của UPX (UPX stub code): bước này tôi làm tương tự như đã trình bày ở trên
  • Fix Import Address Table (IAT) cho PE

Ở đây, chúng ta nhận thấy có IAT: UPX stub code cần gọi các hàm Windows API trong quá trình chạy của mình để: Build IAT cho file (LoadLibraryA, GetProcAddress của thư hiện kernel32.dll) hay gọi VirtualProtect để thiết lập thuộc tính cho các section. Rõ ràng, trong emulator hoàn toàn không có hệ điều hành nên tôi sẽ không có kernel32.dll. Tôi cũng không muốn emulate cả thư viện kernel32.dll. Tôi tạo một bảng fake IAT và dùng các ngắt (interrupt) để “bắt chước” lời gọi API. Các bước như sau:

  • sử dụng keystone-engine tạo một đoạn code mô phỏng một API, gồm 2 lệnh: mov eax, numberint 0xff
  • number là một số bất kì, đại diện cho API
  • ngắt 0xff là một ngắt không ai dùng.
  • sử dụng tính năng UC_HOOK_INTR để hook interrupt

Khi một API được thực thi, EAX sẽ chứa số hiệu tương ứng với hàm cần gọi. Emulator sẽ thực thi ngắt, gọi tới hook_intr. Bên trong hàm hook, thanh ghi EAX sẽ chứa kết quả trả về của hàm API. Bằng phương pháp này, chúng ta có thể mô phỏng bất cứ API nào.

Ngoài ra, UPX stub code không phải code đa hình, chúng ta có thể biết rõ: bắt đầu của stub là lệnh pushad và kết thúc là popad. Tôi sử dụng capstone-engine để disassemble đoạn code, tìm lệnh popad và lấy đó làm địa chỉ kết thúc quá trình emulate.

Kết quả

Sau khi thực hiện tất cả các bước trên, chúng ta có được toàn bộ code của sality, không mã hóa. Chúng ta hoàn toàn toàn có thể dump nó ra file để phân tích sâu hơn. Đơn giản hơn, tôi dùng một đoạn biểu thức chính quy (regular expression) để tìm các URL.

Kết quả thực hiện với một mẫu tôi kiếm được:

Script


Đây là mã nguồn của script tôi đã sử dụng:

# Sality Extractor
# Copyright (C) 2017  quangnh89, develbranch.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# blog:  https://develbranch.com
# email: contact[at]develbranch.com

import pefile
import struct
import re
import argparse
from unicorn import *
from unicorn.x86_const import *
from capstone import *
from keystone import *
from datetime import datetime


class SalityExtractor():
    def __init__(self, sample_file=None, output_file=None):
        self.md = Cs(CS_ARCH_X86, CS_MODE_32)
        self.md.detail = True
        self.sample = sample_file
        self.output = output_file
        self.detected = False
        self.control_server = []

    # utility methods
    @staticmethod
    # display log message
    def log(msg):
        print str(datetime.now()), msg

    # dump all mapped memory to file
    def dump_to_file(self, mu, pe, filename, new_ep_rva=None, runable=True):
        memory_mapped_image = bytearray(mu.mem_read(pe.OPTIONAL_HEADER.ImageBase, pe.OPTIONAL_HEADER.SizeOfImage))
        for section in pe.sections:
            va_adj = pe.adjust_SectionAlignment(section.VirtualAddress, pe.OPTIONAL_HEADER.SectionAlignment,
                                                pe.OPTIONAL_HEADER.FileAlignment)
            if section.Misc_VirtualSize == 0 or section.SizeOfRawData == 0:
                continue
            if section.SizeOfRawData > len(memory_mapped_image):
                continue
            if pe.adjust_FileAlignment(section.PointerToRawData, pe.OPTIONAL_HEADER.FileAlignment) > len(
                    memory_mapped_image):
                continue
            pe.set_bytes_at_rva(va_adj, bytes(memory_mapped_image[va_adj: va_adj + section.SizeOfRawData]))

        pe.write(filename)
        # set new entrypoint
        if new_ep_rva is not None:
            self.log("New entry point %08x" % new_ep_rva)
            f = open(filename, 'r+b')
            f.seek(pe.DOS_HEADER.e_lfanew + 4 + pe.FILE_HEADER.sizeof() + 0x10)
            f.write(struct.pack('<I', new_ep_rva))
            if not runable:
                f.seek(0)
                f.write('mz')
                f.close()
        print ('[+] Save to file {}'.format(filename))

    @staticmethod
    def assembler(address, assembly):
        ks = Ks(KS_ARCH_X86, KS_MODE_32)
        encoding, _ = ks.asm(assembly, address)
        return ''.join(chr(e) for e in encoding)

    # callback for tracing invalid memory access (READ or WRITE)
    # noinspection PyUnusedLocal
    @staticmethod
    def hook_mem_invalid(uc, access, address, size, value, user_data):
        # return False to indicate we want to stop emulation
        return False

    # callback for tracing fake-IAT interrupt
    # noinspection PyUnusedLocal
    def hook_intr(self, uc, intno, user_data):
        # only handle fake-IAT interrupt
        if intno != 0xff:
            print ("got interrupt %x ???" % intno)
            uc.emu_stop()
            return
        eax = uc.reg_read(UC_X86_REG_EAX)
        dll_name, address, name, _ = self.import_addrs[eax]
        if 'kernel32' in dll_name.lower():
            if name == 'LoadLibraryA':
                uc.reg_write(UC_X86_REG_EAX, 0xabababab)
            elif name == 'GetProcAddress':
                uc.reg_write(UC_X86_REG_EAX, 0xbcbcbcbc)
            elif name == 'VirtualProtect':
                uc.reg_write(UC_X86_REG_EAX, 0x1)

    # noinspection PyBroadException
    # noinspection PyUnresolvedReferences
    def emulate_sality_dll(self, memory):
        try:
            pe = pefile.PE(data=memory, fast_load=True)
        except:
            return None

        self.log("[+] Parse Sality DLL")
        pe.parse_data_directories()
        self.import_addrs = []
        for entry in pe.DIRECTORY_ENTRY_IMPORT:
            for imp in entry.imports:
                nparam = 1
                if entry.dll.lower() in 'kernel32.dll':
                    if imp.name == 'LoadLibraryA':
                        nparam = 1
                    elif imp.name == 'GetProcAddress':
                        nparam = 2
                    elif imp.name == 'VirtualProtect':
                        nparam = 4
                self.import_addrs.append((entry.dll, imp.address, imp.name, nparam))

        self.log('[+] Analyze UPX stub code')
        entry_point_code = str(pe.get_memory_mapped_image())[pe.OPTIONAL_HEADER.AddressOfEntryPoint:]
        begin_addr = pe.OPTIONAL_HEADER.ImageBase + pe.OPTIONAL_HEADER.AddressOfEntryPoint
        end_addr = begin_addr
        for i in self.md.disasm(str(entry_point_code), begin_addr):
            if i.mnemonic.lower() in ['popad', 'popal', 'popa']:
                end_addr = i.address + 1
                break
        self.log("[+] Initialize emulator in X86-32bit mode")
        mu = Uc(UC_ARCH_X86, UC_MODE_32)
        # map memory for this emulation
        mu.mem_map(pe.OPTIONAL_HEADER.ImageBase, pe.OPTIONAL_HEADER.SizeOfImage)
        # stack
        stack_addr = 0x1000
        stack_size = 0x4000
        mu.mem_map(stack_addr, stack_size)
        # write machine code to be emulated to memory
        mu.mem_write(pe.OPTIONAL_HEADER.ImageBase, pe.get_memory_mapped_image())
        # initialize machine registers
        mu.reg_write(UC_X86_REG_ESP, stack_addr + stack_size / 2)
        # intercept invalid memory events
        mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, self.hook_mem_invalid)
        # build IAT table
        iat_addr = 0x10000
        e = self.assembler(iat_addr, 'mov eax, 1;int 0xff;ret 0xffff')
        iat_size_adj = pe.adjust_SectionAlignment(len(self.import_addrs) * len(e) + pe.OPTIONAL_HEADER.SectionAlignment,
                                                  pe.OPTIONAL_HEADER.SectionAlignment, pe.OPTIONAL_HEADER.FileAlignment)
        mu.mem_map(iat_addr, iat_size_adj)
        for i in range(len(self.import_addrs)):
            _, iat_entry, _, nparam = self.import_addrs[i]
            func_addr = iat_addr + i * len(e)
            if nparam > 1:
                c = self.assembler(func_addr, 'mov eax, %x;int 0xff;ret %x' % (i, nparam))
            else:
                c = self.assembler(func_addr, 'mov eax, %x;int 0xff;ret' % i)
            mu.mem_write(func_addr, c)
            mu.mem_write(iat_entry, struct.pack('<I', func_addr))
        # handle interrupt ourselves
        mu.hook_add(UC_HOOK_INTR, self.hook_intr)
        self.log('[+] Emulate machine code')
        mu.emu_start(begin_addr, end_addr)
        decoded_memory = mu.mem_read(pe.OPTIONAL_HEADER.ImageBase, pe.OPTIONAL_HEADER.SizeOfImage)
        return decoded_memory

    @staticmethod
    def check_sality(code):
        signature = [(0,
                      '\xE8\x00\x00\x00\x00\x5D\x8B\xC5\x81\xED\x05\x10\x40\x00\x8A\x9D\x73\x27\x40\x00\x84\xDB\x74\x13\x81\xC4'),
                     (0x23,
                      '\x89\x85\x54\x12\x40\x00\xEB\x19\xC7\x85\x4D\x14\x40\x00\x22\x22\x22\x22\xC7\x85\x3A\x14\x40\x00\x33\x33\x33\x33\xE9\x82\x00\x00\x00\x33\xDB\x64\x67\x8B\x1E\x30\x00\x85\xDB\x78\x0E\x8B\x5B\x0C')]
        for offset, s in signature:
            if s != code[offset:offset + len(s)]:
                return False
        return True

    # callback for tracing instructions
    # noinspection PyUnusedLocal
    def hook_code(self, uc, address, size, user_data):
        # I expect 'retn'
        if size != 1:
            return
        if uc.mem_read(address, size) != '\xc3':
            return
        esp = uc.reg_read(UC_X86_REG_ESP)
        sality_entrypoint = struct.unpack('<I', uc.mem_read(esp, 4))[0]
        code = uc.mem_read(sality_entrypoint, 0x100)
        if not self.check_sality(code):
            return
        self.detected = True
        uc.emu_stop()

    # noinspection PyBroadException
    def extract(self):
        if self.sample is None:
            return
        self.log("[+] Parse PE File")
        try:
            self.sample.seek(0)
        except:
            pass
        pe = pefile.PE(data=self.sample.read(), fast_load=True)
        self.log("[+] Initialize emulator in X86-32bit mode")
        mu = Uc(UC_ARCH_X86, UC_MODE_32)
        # map memory for this emulation
        mu.mem_map(pe.OPTIONAL_HEADER.ImageBase, pe.OPTIONAL_HEADER.SizeOfImage)
        # stack
        stack_addr = 0x1000
        stack_size = 0x4000
        mu.mem_map(stack_addr, stack_size)
        # write machine code to be emulated to memory
        mu.mem_write(pe.OPTIONAL_HEADER.ImageBase, pe.get_memory_mapped_image())
        # initialize machine registers
        mu.reg_write(UC_X86_REG_ESP, stack_addr + stack_size / 2)
        # tracing all instructions with customized callback
        mu.hook_add(UC_HOOK_CODE, self.hook_code)
        # intercept invalid memory events
        mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED, self.hook_mem_invalid)
        self.log('[+] Emulate machine code')
        begin_addr = pe.OPTIONAL_HEADER.ImageBase + pe.OPTIONAL_HEADER.AddressOfEntryPoint
        end_addr = pe.OPTIONAL_HEADER.ImageBase + pe.OPTIONAL_HEADER.SizeOfImage
        try:
            mu.emu_start(begin_addr, end_addr)
        except Exception as e:
            self.log('[-] Emulator error: %s' % e)
            return
        if not self.detected:
            self.log('[-] Sality not found')
            return
        self.log("[+] Find Sality section")
        sality_section_addr = None
        eip_rva = mu.reg_read(UC_X86_REG_EIP) - pe.OPTIONAL_HEADER.ImageBase
        for section in pe.sections:
            va_adj = pe.adjust_SectionAlignment(section.VirtualAddress, pe.OPTIONAL_HEADER.SectionAlignment,
                                                pe.OPTIONAL_HEADER.FileAlignment)
            if va_adj <= eip_rva < va_adj + section.Misc_VirtualSize:
                sality_section_addr = va_adj
                break
        if sality_section_addr is None:
            self.log("[-] Sality section not found")
            return
        mapped_memory = str(mu.mem_read(pe.OPTIONAL_HEADER.ImageBase + sality_section_addr,
                                        pe.OPTIONAL_HEADER.SizeOfImage - sality_section_addr))
        self.detect_control_server(mapped_memory)
        for m in re.finditer('MZ', mapped_memory):
            sality_dll = mapped_memory[m.start():]
            decoded_sality_dll = self.emulate_sality_dll(sality_dll)
            if decoded_sality_dll is None:
                continue
            self.detect_control_server(decoded_sality_dll)
            if self.output is not None:
                self.output.write(decoded_sality_dll)
                self.log("[+] Write Sality DLL to file successfully")
        self.log("[+] Analyze Sality DLL successfully")

    def detect_control_server(self, memory):
        # detect URL
        urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', memory)
        for _ in urls:
            self.control_server.append(str(_))


def get_args():
    """This function parses and return arguments passed in"""
    # Assign description to the help doc
    parser = argparse.ArgumentParser(description='Script extracts URLs of Win32-Sality variants from a given file.')
    # Add arguments
    parser.add_argument('-z', '--zip', action='store_true')
    parser.add_argument('-p', '--password', type=str, help='Password to open zip file', required=False,
                        default=None)
    parser.add_argument('-n', '--name', type=str, help='File name in zip file', required=False, default=None)
    parser.add_argument('-d', '--dump', type=str, help='Dump sality DLL to file', required=False, default=None)
    parser.add_argument('file', nargs='?')
    # Array for all arguments passed to script
    args = parser.parse_args()
    file_name = None
    if args.file is not None and len(args.file) > 0:
        file_name = args.file
    # Return all variable values
    return file_name, args.zip, args.password, args.name, args.dump


def main():
    # Match return values from get_args()
    # and assign to their respective variables
    z = None
    file_name, is_zip, password, name, dump = get_args()
    if file_name is None:
        print "Enter file name"
        return
    if is_zip:
        from zipfile import ZipFile

        z = ZipFile(file_name)
        f = z.open(name, 'r', password)
    else:
        f = open(file_name, 'rb')
    if dump is not None:
        d = open(dump, 'wb')
    else:
        d = None
    sd = SalityExtractor(f, d)
    sd.extract()
    if len(sd.control_server) > 0:
        print sd.control_server
    else:
        print 'Found nothing'
    if f is not None:
        f.close()
    if z is not None:
        z.close()
    if d is not None:
        d.close()


if __name__ == '__main__':
    main()

Author: Quang Nguyen