메뉴 건너뛰기

Hello :0

1. 샘플 구하기

안드로이드 샘플을 구하기 위해 https://github.com/mssun/android-apps-crawler 의 소스코드를 수정하였다. 해당 크롤러는 scrapy 기반으로 작성되었고 Crawler과 Dwonloader루 구성 오류가 있는부분이 많다.

 

1.png

cralwer db에 정보를 저장 후 별도의 다운로드 모듈로 파일다운로드.

 

APK 파일의 특성으로 잡을 Method와 permissions들을 구한다. 해당 값들을 구하기 위해 androguard를 사용.

 

from androguard.core.bytecodes import apk
from androguard.core.bytecodes import dvm


#apk method 가져오기
def dump_method(apk_file):
    call_method=[]
    a = apk.APK(apk_file)
    d = dvm.DalvikVMFormat(a.get_dex())
    for current_class in d.get_classes():
        for method in current_class.get_methods():
            call_method.append(method.get_name())

    call_method = list(set(call_method))

    return call_method, len(call_method)

#apk 퍼미션
def dump_permissions(apk_file):
    call_permissions = []
    a = apk.APK(apk_file)
    for per in a.get_permissions():
        per=per.split(".")
        call_permissions.append(per[-1])

    call_permissions = list(set(call_permissions))

    return call_permissions, len(call_permissions)

이런식으로 APK 파일 정보를 구한다.

 

2.png

APK 파일을 정보를 구한 것들을 mongodb에 집어 넣는다. label 데이터셋의 라벨로 virustotal에서 20개 이상의 벤터사에서 탐지하는 경우 악성으로 1을 0곳에서 탐지는 경우 0으로 저장한다.

 

#-*- coding: utf-8 -*-

import pymongo
import random
import numpy as np
from sklearn import svm
from sklearn.metrics import f1_score
from sklearn.feature_extraction.text import CountVectorizer
import datetime
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression

connection = pymongo.MongoClient("127.0.0.1", 27017)
android = connection.android
apk_machine = android.apk_machine

data_list = apk_machine.find()

def get_apk_info():
    data =[]
    label =[]

    with open('./good_method.txt') as g_f:
        goos_itme = g_f.read().splitlines()

    with open('./bad_method.txt') as b_f:
        bads_itme = b_f.read().splitlines()

    unique_good = list(set(goos_itme) - set(bads_itme))
    unique_bad = list(set(bads_itme) - set(goos_itme))

    all_method_list= unique_good+unique_bad

    for line in data_list:
        one_apk_method=[]
        for single_method in line['dump_method']:
            if single_method in all_method_list:
                one_apk_method.append(single_method)
        permissions_method= set(line['dump_permissions']+one_apk_method)
        data.append((" ".join(permissions_method).lower()))
        label.append( line['label'])

    return data, label


def make_feature_vector(critique, labels):
    vectorizer = CountVectorizer(min_df=0)

    X = vectorizer.fit_transform(critique)
    vectorizer.get_feature_names()
    vec = X.toarray()#행 한게당 단어 빈도수

    labels = np.array(labels)
    labels= labels.reshape(len(labels),1)
    vec = np.hstack((vec,labels))
    return vec


def make_np_array_XY(xy):
""" takes XY (feature + lable) lists, then makes np array for X, Y """
x = xy[:,0:-1]
y = xy[:,-1]
return x,y

def main():
    print "Start learning : " +str(datetime.datetime.now())
    critiques, labels =get_apk_info()

    features_and_labels = make_feature_vector(critiques, labels)
    print "Vector Len : "+str(len(features_and_labels[0])-1)

    random.shuffle(features_and_labels)
    #print features_and_labels

    # make  train / test sets from the shuffled list
    cut = int(len(features_and_labels) * 0.95)
    XY_train = features_and_labels[:cut]
    XY_test = features_and_labels[cut:]

    X_train, Y_train = make_np_array_XY(XY_train)
    X_test, Y_test = make_np_array_XY(XY_test)
    print 'len(X_test) = %s len(Y_test) = %s' % (len(X_test), len(Y_test))

    # train set
    C = 1.0  # SVM regularization parameter

    svc = svm.SVC(kernel='linear', C=C).fit(X_train, Y_train)
    print "linear_model.SGDClassifier()..."
    from sklearn import linear_model
    #svc = linear_model.SGDClassifier().fit(X_train, Y_train)
    print "End learning : " + str(datetime.datetime.now())
    print "svc.predict()..."
    Y_predict = svc.predict(X_test)

    print 'Y_predict:\n', Y_predict
    print 'Y_test:   \n', Y_test

    # score
    test_size = len(Y_test)
    score = 0
    for i in range(test_size):
        if Y_predict[i] == Y_test[i]:
            score += 1
    print 'Got %s out of %s' % (score, test_size)
    f1 = f1_score(Y_test, Y_predict)
    print 'f1 macro = %.2f\n' % (f1)

    rf = RandomForestClassifier()
    rf.fit(X_train, Y_train)
    Y_predict = rf.predict(X_test)

    # score
    test_size = len(Y_test)
    score = 0
    for i in range(test_size):
        if Y_predict[i] == Y_test[i]:
            score += 1
    print "RandomForest Classifier"
    print 'Got %s out of %s\n' % (score, test_size)

    ab = AdaBoostClassifier()
    ab.fit(X_train, Y_train)
    Y_predict = ab.predict(X_test)

    # score
    test_size = len(Y_test)
    score = 0
    for i in range(test_size):
        if Y_predict[i] == Y_test[i]:
            score += 1
    print "AdaBoost Classifier"
    print 'Got %s out of %s\n' % (score, test_size)

    from sklearn.tree import DecisionTreeClassifier
    dt = DecisionTreeClassifier()
    dt.fit(X_train, Y_train)
    Y_predict = dt.predict(X_test)

    # score
    test_size = len(Y_test)
    score = 0
    for i in range(test_size):
        if Y_predict[i] == Y_test[i]:
            score += 1
    print "DecisionTree Classifier"
    print 'Got %s out of %s\n' % (score, test_size)

    lg = LogisticRegression()
    lg.fit(np.array(X_train, dtype=float), np.array(Y_train, dtype=float))
    Y_predict = lg.predict(np.array(X_test, dtype=float))
    a = np.array(Y_test, dtype=float)
    # score
    test_size = len(Y_test)
    score = 0
    for i in range(test_size):
        if Y_predict[i] == a[i]:
            score += 1
    print "LogisticRegression Classifier"
    print 'Got %s out of %s\n' % (score, test_size)


if __name__ == '__main__':
    main()

python의 sckit learn 라이브러리 사용코드

 

3.png

100개의 샘플에 대해서 5개의 분류법을 이용해본 결과 잘나오면 약 93~98%의 정확도를 보여준다.

수집된 샘플의 한계 때문에 좀 더 연구를 하기가 힘들었다. 완전한 악성보다는 PUP가 많기도 하고 중국 Third Party APK 파일들이라 실제 활용은 연구가 더 필요