VirusTotal API를 활용하여 파일을 쉽게 분석하는 방법을 소개하겠습니다. 특히 무료 계정을 사용하는 경우 API 호출 제한에 대해 대기 시간을 처리하고, 분석 결과를 파일에 저장하는 방법에 대해서도 설명드리겠습니다. 또한, 분석 결과에 파일 해시값을 포함하고, 정상 및 악성 파일을 각각의 폴더로 분류하는 방법도 다룹니다.
※ 본 포스팅은 ChatGPT의 도움을 받아 작성한 포스트임
VirusTotal API 준비 작업
먼저, VirusTotal API를 사용하기 위해 필요한 준비 작업을 설명하겠습니다.
VirusTotal API 키 발급
VirusTotal(virustotal.com) 웹사이트에서 회원가입 후 API 키를 발급받습니다. 이는 API 호출 시 인증에 사용됩니다.
vt-py 라이브러리 설치
Python에서 VirusTotal API를 쉽게 사용할 수 있도록 터미널에서 vt-py 라이브러리를 설치합니다.
(유의사항) 처음에 라이브러리명이 vt인줄 알았다… “pip install vt” 하지 말 것!!
pip install vt-py
코드 설명
이번 프로젝트의 핵심 코드는 파일 해시를 계산하고, VirusTotal API를 통해 파일을 분석하는 것입니다. 특히 무료 계정의 API 호출 제한을 고려하여 대기 시간을 추가하는 부분이 중요한 포인트입니다. 또한, 중간에 중단되더라도 분석이 완료된 파일을 기록하여 이어서 진행할 수 있도록 구현합니다. 추가로, 분석 결과에 파일 해시값을 포함하고, 정상 및 악성 파일을 각각의 폴더로 분류합니다.
해시 계산 및 파일 해시값 추출
우선, 디렉토리 내 파일들의 해시값을 계산합니다.
import os
import hashlib
def calculate_file_hash(file_path, hash_algorithm='sha256'):
hash_func = hashlib.new(hash_algorithm)
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_func.update(chunk)
return hash_func.hexdigest()
def get_hashes_for_files_in_directory(directory_path, hash_algorithm='sha256'):
file_hashes = []
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
file_hash = calculate_file_hash(file_path, hash_algorithm)
file_hashes.append((file_path, file_hash))
return file_hashes
VirusTotal API를 활용한 파일 분석
파일 해시값을 VirusTotal API에 전달하여 분석합니다. 무료 계정의 제한에 대응하기 위해 대기 시간을 추가하였습니다.
import vt
def check_file_with_virustotal(api_key, file_hash):
with vt.Client(api_key) as client:
try:
file = client.get_object(f"/files/{file_hash}")
return file.last_analysis_stats
except vt.error.APIError as e:
if e.code == 'NotFoundError':
return None # File not found in VirusTotal database
else:
raise e
def upload_file_to_virustotal(api_key, file_path):
with vt.Client(api_key) as client:
with open(file_path, "rb") as f:
analysis = client.scan_file(f)
return analysis
스캔된 파일 기록 및 중복 분석 방지
이미 스캔된 파일을 기록하여 중복 분석을 방지합니다. 또한 작업이 중간에 중지되었을 때를 대비하여 기록합니다.
def load_scanned_hashes(file_path):
if not os.path.exists(file_path):
return set()
with open(file_path, 'r', encoding='utf-8') as f:
return set(line.strip() for line in f)
def save_scanned_hash(file_path, file_hash):
with open(file_path, 'a', encoding='utf-8') as f:
f.write(file_hash + '\n')
중복 시 아래와 같이 스킵 됐다는 로그가 뜹니다.

결과값 파일에 저장 및 파일 분류
각 파일의 분석 결과를 결과 파일에 저장하고, 정상 파일과 악성 파일을 각각의 폴더로 이동시킵니다.
from datetime import datetime
import shutil
def save_result(file_path, result):
with open(file_path, 'a', encoding='utf-8') as f:
f.write(result + '\n')
def move_file_to_directory(file_path, target_directory):
if not os.path.exists(target_directory):
os.makedirs(target_directory)
destination_path = os.path.join(target_directory, os.path.basename(file_path))
shutil.move(file_path, destination_path)
설정 파일 로드 및 저장
설정파일을 로드하고 저장하는 기능을 추가합니다(api값).
import json
def load_settings():
if os.path.exists(settings_file_path):
with open(settings_file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_settings(settings):
with open(settings_file_path, 'w', encoding='utf-8') as f:
json.dump(settings, f, ensure_ascii=False, indent=4)
전체 코드 및 실행
이제 모든 코드를 통합하여 실행합니다. 무료 계정의 호출 제한을 고려하여 분당 4개의 파일만 분석하도록 대기 시간을 추가합니다. 또한, 결과는 날짜가 포함된 파일 이름으로 저장됩니다.
import os
import os
import hashlib
import time
import hashlib
import time
import shutil
import vt
import argparse
from datetime import datetime
import json
# 설정 파일 경로
settings_file_path = 'settings.json'
# 현재 날짜를 기반으로 결과 파일 이름 설정
today_date = datetime.now().strftime('%Y-%m-%d')
hashes_file_path = 'scanned_hashes.txt'
results_file_path = f'results_{today_date}.txt'
def calculate_file_hash(file_path, hash_algorithm='sha256'):
hash_func = hashlib.new(hash_algorithm)
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_func.update(chunk)
return hash_func.hexdigest()
def get_hashes_for_files_in_directory(directory_path, hash_algorithm='sha256'):
file_hashes = []
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
file_hash = calculate_file_hash(file_path, hash_algorithm)
file_hashes.append((file_path, file_hash))
return file_hashes
def check_file_with_virustotal(api_key, file_hash):
with vt.Client(api_key) as client:
try:
file = client.get_object(f"/files/{file_hash}")
return file.last_analysis_stats
except vt.error.APIError as e:
if e.code == 'NotFoundError':
return None # File not found in VirusTotal database
else:
raise e
def upload_file_to_virustotal(api_key, file_path):
with vt.Client(api_key) as client:
with open(file_path, "rb") as f:
analysis = client.scan_file(f)
return analysis
def load_scanned_hashes(file_path):
if not os.path.exists(file_path):
return set()
with open(file_path, 'r', encoding='utf-8') as f:
return set(line.strip() for line in f)
def save_scanned_hash(file_path, file_hash):
with open(file_path, 'a', encoding='utf-8') as f:
f.write(file_hash + '\n')
def save_result(file_path, result):
with open(file_path, 'a', encoding='utf-8') as f:
f.write(result + '\n')
def move_file_to_directory(file_path, target_directory):
if not os.path.exists(target_directory):
os.makedirs(target_directory)
destination_path = os.path.join(target_directory, os.path.basename(file_path))
shutil.move(file_path, destination_path)
def load_settings():
if os.path.exists(settings_file_path):
with open(settings_file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_settings(settings):
with open(settings_file_path, 'w', encoding='utf-8') as f:
json.dump(settings, f, ensure_ascii=False, indent=4)
def main():
parser = argparse.ArgumentParser(description="VirusTotal 파일 검사 및 분류 스크립트")
parser.add_argument("directory_path", help="검사할 파일들이 있는 디렉토리 경로")
parser.add_argument("result_directory", help="검사 결과 파일들이 이동할 디렉토리 경로")
parser.add_argument("--api_key", help="VirusTotal API 키", required=False)
args = parser.parse_args()
settings = load_settings()
if args.api_key:
api_key = args.api_key
settings['api_key'] = api_key
save_settings(settings)
elif 'api_key' in settings:
api_key = settings['api_key']
else:
print("API 키가 제공되지 않았습니다. --api_key 인자를 사용하거나 설정 파일에 저장된 API 키를 사용하세요.")
return
# 결과 디렉토리 설정
clean_files_directory = os.path.join(args.result_directory, 'normal')
malicious_files_directory = os.path.join(args.result_directory, 'malicious')
hashes = get_hashes_for_files_in_directory(args.directory_path, 'sha256')
scanned_hashes = load_scanned_hashes(hashes_file_path)
files_checked = 0
for file_path, file_hash in hashes:
if file_hash in scanned_hashes:
print(f"File {file_path} has already been scanned. Skipping.")
continue
if files_checked > 0 and files_checked % 4 == 0:
print("API 제한으로 인해 1분 대기 중...")
time.sleep(60) # 분당 4개 파일만 분석할 수 있도록 60초 대기
try:
print(f"Checking file: {file_path}")
analysis_stats = check_file_with_virustotal(api_key, file_hash)
if analysis_stats:
malicious = analysis_stats['malicious']
result = f'File: {file_path}, Hash: {file_hash}, Malicious: {malicious}'
print(result)
save_result(results_file_path, result)
if malicious > 0:
move_file_to_directory(file_path, malicious_files_directory)
else:
move_file_to_directory(file_path, clean_files_directory)
else:
print(f"File: {file_path} not found in VirusTotal. Uploading for analysis.")
analysis = upload_file_to_virustotal(api_key, file_path)
result = f'Uploaded file {file_path} for analysis. Check VirusTotal for results. Hash: {file_hash}'
print(result)
save_result(results_file_path, result)
move_file_to_directory(file_path, clean_files_directory)
except Exception as e:
error_message = f"Error processing file {file_path}: {e}"
print(error_message)
save_result(results_file_path, error_message)
continue
save_scanned_hash(hashes_file_path, file_hash)
files_checked += 1
if __name__ == "__main__":
main()
import shutil
import vt
from datetime import datetime
# 현재 날짜를 기반으로 결과 파일 이름 설정
today_date = datetime.now().strftime('%Y-%m-%d')
hashes_file_path = 'scanned_hashes.txt'
results_file_path = f'results_{today_date}.txt'
clean_files_directory = 'clean_files' # 정상 파일을 이동할 디렉토리
malicious_files_directory = 'malicious_files' # 악성 파일을 이동할 디렉토리
# 이동할 디렉토리가 없으면 생성
os.makedirs(clean_files_directory, exist_ok=True)
os.makedirs(malicious_files_directory, exist_ok=True)
def calculate_file_hash(file_path, hash_algorithm='sha256'):
hash_func = hashlib.new(hash_algorithm)
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_func.update(chunk)
return hash_func.hexdigest()
def get_hashes_for_files_in_directory(directory_path, hash_algorithm='sha256'):
file_hashes = []
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
file_hash = calculate_file_hash(file_path, hash_algorithm)
file_hashes.append((file_path, file_hash))
return file_hashes
def check_file_with_virustotal(api_key, file_hash):
with vt.Client(api_key) as client:
try:
file = client.get_object(f"/files/{file_hash}")
return file.last_analysis_stats
except vt.error.APIError as e:
if e.code == 'NotFoundError':
return None # File not found in VirusTotal database
else:
raise e
def upload_file_to_virustotal(api_key, file_path):
with vt.Client(api_key) as client:
with open(file_path, "rb") as f:
analysis = client.scan_file(f)
return analysis
def load_scanned_hashes(file_path):
if not os.path.exists(file_path):
return set()
with open(file_path, 'r', encoding='utf-8') as f:
return set(line.strip() for line in f)
def save_scanned_hash(file_path, file_hash):
with open(file_path, 'a', encoding='utf-8') as f:
f.write(file_hash + '\n')
def save_result(file_path, result):
with open(file_path, 'a', encoding='utf-8') as f:
f.write(result + '\n')
def move_file_to_directory(file_path, target_directory):
if not os.path.exists(target_directory):
os.makedirs(target_directory)
destination_path = os.path.join(target_directory, os.path.basename(file_path))
shutil.move(file_path, destination_path)
# Example usage:
api_key = 'your_actual_virustotal_api_key' # 여기에 실제 VirusTotal API 키를 입력합니다.
directory_path = 'C:\\path\\to\\your\\directory' # 여기에 실제 파일들이 있는 디렉토리 경로를 입력합니다.
hashes = get_hashes_for_files_in_directory(directory_path, 'sha256')
scanned_hashes = load_scanned_hashes(hashes_file_path)
files_checked = 0
for file_path, file_hash in hashes:
if file_hash in scanned_hashes:
print(f"File {file_path} has already been scanned. Skipping.")
continue
if files_checked > 0 and files_checked % 4 == 0:
print("API 제한으로 인해 1분 대기 중...")
time.sleep(60) # 분당 4개 파일만 분석할 수 있도록 60초 대기
try:
print(f"Checking file: {file_path}")
analysis_stats = check_file_with_virustotal(api_key, file_hash)
if analysis_stats:
malicious = analysis_stats['malicious']
result = f'File: {file_path}, Hash: {file_hash}, Malicious: {malicious}'
print(result)
save_result(results_file_path, result)
if malicious > 0:
move_file_to_directory(file_path, malicious_files_directory)
else:
move_file_to_directory(file_path, clean_files_directory)
else:
print(f"File: {file_path} not found in VirusTotal. Uploading for analysis.")
analysis = upload_file_to_virustotal(api_key, file_path)
result = f'Uploaded file {file_path} for analysis. Check VirusTotal for results. Hash: {file_hash}'
print(result)
save_result(results_file_path, result)
# 임시로 파일을 clean_files_directory로 이동
move_file_to_directory(file_path, clean_files_directory)
except Exception as e:
error_message = f"Error processing file {file_path}: {e}"
print(error_message)
save_result(results_file_path, error_message)
continue
save_scanned_hash(hashes_file_path, file_hash)
files_checked += 1
결론
이렇게 해서 VirusTotal API를 사용하여 파일을 쉽게 분석하는 방법을 알아보았습니다. 특히 무료 계정의 호출 제한을 고려하여 대기 시간을 추가하는 방법을 통해 효율적으로 파일을 분석할 수 있었습니다. 중단된 경우에도 이어서 작업을 수행할 수 있도록 구현하였습니다.
아래와 같이 폴더별로 원본파일을 나눠서 저장합니다.
