#!/usr/bin/env python3
# Copyright (c) 2022, the R8 project authors. Please see the AUTHORS file
# for details. All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.

import argparse
import os
import subprocess
import sys
import threading
import time

from enum import Enum

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import profile_utils
import utils

DEVNULL = subprocess.DEVNULL


class ProcessReader(threading.Thread):

    def __init__(self, process):
        threading.Thread.__init__(self)
        self.lines = []
        self.process = process

    def run(self):
        for line in self.process.stdout:
            line = line.decode('utf-8').strip()
            self.lines.append(line)

    def stop(self):
        self.process.kill()


class ScreenState(Enum):
    OFF_LOCKED = 1,
    OFF_UNLOCKED = 2
    ON_LOCKED = 3
    ON_UNLOCKED = 4
    UNKNOWN = 5

    def is_off(self):
        return self == ScreenState.OFF_LOCKED or self == ScreenState.OFF_UNLOCKED

    def is_off_or_unknown(self):
        return self.is_off() or self.is_unknown()

    def is_on(self):
        return self == ScreenState.ON_LOCKED or self == ScreenState.ON_UNLOCKED

    def is_on_and_locked(self):
        return self == ScreenState.ON_LOCKED

    def is_on_and_unlocked(self):
        return self == ScreenState.ON_UNLOCKED

    def is_on_and_unlocked_or_unknown(self):
        return self.is_on_and_unlocked() or self.is_unknown()

    def is_on_or_unknown(self):
        return self.is_on() or self.is_unknown()

    def is_unknown(self):
        return self == ScreenState.UNKNOWN


def broadcast(action, component, device_id=None):
    print('Sending broadcast %s' % action)
    cmd = create_adb_cmd('shell am broadcast -a %s %s' % (action, component),
                         device_id)
    return subprocess.check_output(cmd).decode('utf-8').strip().splitlines()


def build_apks_from_bundle(bundle, output, overwrite=False):
    print('Building %s' % bundle)
    cmd = [
        'java', '-jar', utils.BUNDLETOOL_JAR, 'build-apks',
        '--bundle=%s' % bundle,
        '--output=%s' % output
    ]
    if overwrite:
        cmd.append('--overwrite')
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def capture_screen(target, device_id=None):
    print('Taking screenshot to %s' % target)
    tmp = '/sdcard/screencap.png'
    cmd = create_adb_cmd('shell screencap -p %s' % tmp, device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)
    pull(tmp, target, device_id)


def create_adb_cmd(arguments, device_id=None):
    assert isinstance(arguments, list) or isinstance(arguments, str)
    cmd = ['adb']
    if device_id is not None:
        cmd.append('-s')
        cmd.append(device_id)
    cmd.extend(
        arguments if isinstance(arguments, list) else arguments.split(' '))
    return cmd


def capture_app_profile_data(app_id, device_id=None):
    ps_cmd = create_adb_cmd('shell ps -o NAME', device_id)
    stdout = subprocess.check_output(ps_cmd).decode('utf-8').strip()
    killed_any = False
    for process_name in stdout.splitlines():
        if process_name.startswith(app_id):
            print('Flushing profile for process %s' % process_name)
            killall_cmd = create_adb_cmd(
                'shell killall -s SIGUSR1 %s' % process_name, device_id)
            killall_result = subprocess.run(killall_cmd, capture_output=True)
            stdout = killall_result.stdout.decode('utf-8')
            stderr = killall_result.stderr.decode('utf-8')
            if killall_result.returncode == 0:
                killed_any = True
            else:
                print('Error: stdout: %s, stderr: %s' % (stdout, stderr))
            time.sleep(5)
    assert killed_any, 'Expected to find at least one process'


def check_app_has_profile_data(app_id, device_id=None):
    profile_path = get_profile_path(app_id)
    cmd = create_adb_cmd(
        'shell du /data/misc/profiles/cur/0/%s/primary.prof' % app_id,
        device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    size_str = stdout[:stdout.index('\t')]
    assert size_str.isdigit()
    size = int(size_str)
    if size == 4:
        raise ValueError('Expected size of profile at %s to be > 4K' %
                         profile_path)


def clear_logcat(device_id=None):
    cmd = create_adb_cmd('logcat -c', device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def clear_profile_data(app_id, device_id=None):
    cmd = create_adb_cmd('shell cmd package compile --reset %s' % app_id,
                         device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def drop_caches(device_id=None):
    # On older devices  this used to be achieved using:
    #   adb shell echo 3 > /proc/sys/vm/drop_caches
    # This does not work on user devices, however.
    cmd = create_adb_cmd(['shell', 'setprop', 'perf.drop_caches', '3'],
                         device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def ensure_screen_on(device_id=None):
    if get_screen_state(device_id).is_off():
        toggle_screen(device_id)
    assert get_screen_state(device_id).is_on_or_unknown()


def ensure_screen_off(device_id=None):
    if get_screen_state(device_id).is_on():
        toggle_screen(device_id)
    assert get_screen_state(device_id).is_off_or_unknown()


def force_compilation(app_id, device_id=None):
    print('Applying AOT (full)')
    cmd = create_adb_cmd('shell cmd package compile -m speed -f %s' % app_id,
                         device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def force_profile_compilation(app_id, device_id=None):
    print('Applying AOT (profile)')
    cmd = create_adb_cmd(
        'shell cmd package compile -m speed-profile -f %s' % app_id, device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def get_apk_path(app_id, device_id=None):
    cmd = create_adb_cmd('shell pm path %s' % app_id, device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    if not stdout.startswith('package:'):
        raise ValueError('Expected stdout to start with "package:", was: %s' %
                         stdout)
    apk_path = stdout[len('package:'):]
    if not apk_path.endswith('.apk'):
        raise ValueError('Expected stdout to end with ".apk", was: %s' % stdout)
    return apk_path


def get_component_name(app_id, activity):
    if activity.startswith(app_id):
        return '%s/.%s' % (app_id, activity[len(app_id) + 1:])
    else:
        return '%s/%s' % (app_id, activity)


def get_meminfo(app_id, device_id=None):
    cmd = create_adb_cmd('shell dumpsys meminfo -s %s' % app_id, device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    for line in stdout.splitlines():
        if 'TOTAL PSS: ' in line:
            elements = [s for s in line.replace('TOTAL ', 'TOTAL_').split()]
            assert elements[0] == 'TOTAL_PSS:', elements[0]
            assert elements[1].isdigit()
            assert elements[2] == 'TOTAL_RSS:'
            assert elements[3].isdigit()
            return {
                'total_pss': int(elements[1]),
                'total_rss': int(elements[3])
            }
    raise ValueError('Unexpected stdout: %s' % stdout)


def get_profile_data(app_id, device_id=None):
    with utils.TempDir() as temp:
        source = get_profile_path(app_id)
        target = os.path.join(temp, 'primary.prof')
        pull(source, target, device_id)
        with open(target, 'rb') as f:
            return f.read()


def get_profile_path(app_id):
    return '/data/misc/profiles/cur/0/%s/primary.prof' % app_id


def get_minor_major_page_faults(app_id, device_id=None):
    pid = get_pid(app_id, device_id)
    cmd = create_adb_cmd('shell ps -p %i -o MINFL,MAJFL' % pid, device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8')
    lines_it = iter(stdout.splitlines())
    first_line = next(lines_it)
    assert first_line == ' MINFL  MAJFL'
    second_line = next(lines_it)
    minfl, majfl = second_line.split()
    assert minfl.isdigit()
    assert majfl.isdigit()
    return (int(minfl), int(majfl))


def get_pid(app_id, device_id=None):
    cmd = create_adb_cmd('shell pidof %s' % app_id, device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    assert stdout.isdigit()
    pid = int(stdout)
    return pid


def get_screen_state(device_id=None):
    cmd = create_adb_cmd('shell dumpsys nfc', device_id)
    process_result = subprocess.run(cmd, capture_output=True)
    stderr = process_result.stderr.decode('utf-8')
    if "Can't find service: nfc" in stderr:
        return ScreenState.UNKNOWN
    stdout = process_result.stdout.decode('utf-8').strip()
    screen_state_value = None
    for line in stdout.splitlines():
        if line.startswith('mScreenState='):
            value_start_index = len('mScreenState=')
            screen_state_value = line[value_start_index:]
    if screen_state_value is None:
        raise ValueError(
            'Expected to find mScreenState in: adb shell dumpsys nfc')
    if not hasattr(ScreenState, screen_state_value):
        raise ValueError(
            'Expected mScreenState to be a value of ScreenState, was: %s' %
            screen_state_value)
    return ScreenState[screen_state_value]


def get_classes_and_methods_from_app_profile(app_id, device_id=None):
    apk_path = get_apk_path(app_id, device_id)
    profile_path = get_profile_path(app_id)
    cmd = create_adb_cmd(
        'shell profman --dump-classes-and-methods'
        ' --profile-file=%s --apk=%s --dex-location=%s' %
        (profile_path, apk_path, apk_path), device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    lines = stdout.splitlines()
    return profile_utils.parse_art_profile(lines)


def get_screen_off_timeout(device_id=None):
    cmd = create_adb_cmd('shell settings get system screen_off_timeout',
                         device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    assert stdout.isdigit()
    screen_off_timeout = int(stdout)
    return screen_off_timeout


def grant(app_id, permission, device_id=None):
    cmd = create_adb_cmd('shell pm grant %s %s' % (app_id, permission),
                         device_id)
    subprocess.check_call(cmd)


def install(apk, device_id=None):
    print('Installing %s' % apk)
    cmd = create_adb_cmd('install %s' % apk, device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8')
    assert 'Success' in stdout


def install_apks(apks, device_id=None, max_attempts=3):
    print('Installing %s' % apks)
    cmd = [
        'java', '-jar', utils.BUNDLETOOL_JAR, 'install-apks',
        '--apks=%s' % apks
    ]
    if device_id is not None:
        cmd.append('--device-id=%s' % device_id)
    for i in range(max_attempts):
        process_result = subprocess.run(cmd, capture_output=True)
        stdout = process_result.stdout.decode('utf-8')
        stderr = process_result.stderr.decode('utf-8')
        if process_result.returncode == 0:
            return
        print('Failed to install %s' % apks)
        print('Stdout: %s' % stdout)
        print('Stderr: %s' % stderr)
        print('Retrying...')
    raise Exception('Unable to install apks in %s attempts' % max_attempts)


def install_bundle(bundle, device_id=None):
    print('Installing %s' % bundle)
    with utils.TempDir() as temp:
        apks = os.path.join(temp, 'Bundle.apks')
        build_apks_from_bundle(bundle, apks)
        install_apks(apks, device_id)


def install_profile_using_adb(app_id, host_profile_path, device_id=None):
    device_profile_path = get_profile_path(app_id)
    cmd = create_adb_cmd('push %s %s' %
                         (host_profile_path, device_profile_path))
    subprocess.check_call(cmd)
    stop_app(app_id, device_id)
    force_profile_compilation(app_id, device_id)


def install_profile_using_profileinstaller(app_id, device_id=None):
    # This assumes that the profileinstaller library has been added to the app,
    # https://developer.android.com/jetpack/androidx/releases/profileinstaller.
    action = 'androidx.profileinstaller.action.INSTALL_PROFILE'
    component = '%s/androidx.profileinstaller.ProfileInstallReceiver' % app_id
    stdout = broadcast(action, component, device_id)
    assert len(stdout) == 2
    assert stdout[0] == ('Broadcasting: Intent { act=%s flg=0x400000 cmp=%s }' %
                         (action, component))
    assert stdout[1] == 'Broadcast completed: result=1', stdout[1]
    stop_app(app_id, device_id)
    force_profile_compilation(app_id, device_id)


def issue_key_event(key_event, device_id=None, sleep_in_seconds=1):
    cmd = create_adb_cmd('shell input keyevent %s' % key_event, device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    assert len(stdout) == 0
    time.sleep(sleep_in_seconds)


def launch_activity(app_id,
                    activity,
                    device_id=None,
                    intent_data_uri=None,
                    wait_for_activity_to_launch=False):
    args = ['shell', 'am', 'start', '-n', '%s/%s' % (app_id, activity)]
    if intent_data_uri:
        args.extend(['-d', intent_data_uri])
    if wait_for_activity_to_launch:
        args.append('-W')
    cmd = create_adb_cmd(args, device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    assert stdout.startswith('Starting: Intent {')
    expected_component = 'cmp=%s' % get_component_name(app_id, activity)
    assert expected_component in stdout, \
        'was %s, expected %s' % (stdout, expected_component)
    lines = stdout.splitlines()
    result = {}
    for line in lines:
        if line.startswith('TotalTime: '):
            total_time_str = line.removeprefix('TotalTime: ')
            assert total_time_str.isdigit()
            result['total_time'] = int(total_time_str)
    assert not wait_for_activity_to_launch or 'total_time' in result, lines
    return result


def navigate_to_home_screen(device_id=None):
    cmd = create_adb_cmd('shell input keyevent KEYCODE_HOME', device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def prepare_for_interaction_with_device(device_id=None, device_pin=None):
    # Increase screen off timeout to avoid device screen turns off.
    twenty_four_hours_in_millis = 24 * 60 * 60 * 1000
    previous_screen_off_timeout = get_screen_off_timeout(device_id)
    set_screen_off_timeout(twenty_four_hours_in_millis, device_id)

    # Unlock device.
    unlock(device_id, device_pin)

    teardown_options = {
        'previous_screen_off_timeout': previous_screen_off_timeout
    }
    return teardown_options


def pull(source, target, device_id=None):
    cmd = create_adb_cmd('pull %s %s' % (source, target), device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def root(device_id=None):
    cmd = create_adb_cmd('root', device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def set_screen_off_timeout(screen_off_timeout_in_millis, device_id=None):
    cmd = create_adb_cmd(
        'shell settings put system screen_off_timeout %i' %
        screen_off_timeout_in_millis, device_id)
    stdout = subprocess.check_output(cmd).decode('utf-8').strip()
    assert len(stdout) == 0


def start_logcat(device_id=None, format=None, filter=None, silent=False):
    args = ['logcat']
    if format:
        args.extend(['--format', format])
    if silent:
        args.append('-s')
    if filter:
        args.append(filter)
    cmd = create_adb_cmd(args, device_id)
    logcat_process = subprocess.Popen(cmd,
                                      bufsize=1024 * 1024,
                                      stdout=subprocess.PIPE,
                                      stderr=subprocess.PIPE)
    reader = ProcessReader(logcat_process)
    reader.start()
    return reader


def stop_logcat(logcat_reader):
    logcat_reader.stop()
    logcat_reader.join()
    return logcat_reader.lines


def stop_app(app_id, device_id=None):
    print('Shutting down %s' % app_id)
    cmd = create_adb_cmd('shell am force-stop %s' % app_id, device_id)
    subprocess.check_call(cmd, stdout=DEVNULL, stderr=DEVNULL)


def teardown_after_interaction_with_device(teardown_options, device_id=None):
    # Reset screen off timeout.
    set_screen_off_timeout(teardown_options['previous_screen_off_timeout'],
                           device_id)


def toggle_screen(device_id=None):
    issue_key_event('KEYCODE_POWER', device_id)


def uninstall(app_id, device_id=None):
    print('Uninstalling %s' % app_id)
    cmd = create_adb_cmd('uninstall %s' % app_id, device_id)
    process_result = subprocess.run(cmd, capture_output=True)
    stdout = process_result.stdout.decode('utf-8')
    stderr = process_result.stderr.decode('utf-8')
    if process_result.returncode == 0:
        assert 'Success' in stdout
    elif stdout.startswith('cmd: Failure calling service package: Broken pipe'):
        assert app_id == 'com.google.android.youtube'
        print('Waiting after broken pipe')
        time.sleep(15)
    else:
        expected_error = (
            'java.lang.IllegalArgumentException: Unknown package: %s' % app_id)
        assert 'Failure [DELETE_FAILED_INTERNAL_ERROR]' in stdout \
            or expected_error in stderr, \
            'stdout: %s, stderr: %s' % (stdout, stderr)


def unlock(device_id=None, device_pin=None):
    ensure_screen_on(device_id)
    screen_state = get_screen_state(device_id)
    if screen_state.is_unknown():
        return
    assert screen_state.is_on(), 'was %s' % screen_state
    if screen_state.is_on_and_locked():
        if device_pin is not None:
            raise NotImplementedError(
                'Device unlocking with pin not implemented')
        issue_key_event('KEYCODE_MENU', device_id)
        screen_state = get_screen_state(device_id)
    assert screen_state.is_on_and_unlocked(), 'was %s' % screen_state


def parse_options(argv):
    result = argparse.ArgumentParser(description='Run adb utils.')
    result.add_argument('--capture-screen', help='Capture screen to given file')
    result.add_argument('--device-id', help='Device id (e.g., emulator-5554).')
    result.add_argument('--device-pin', help='Device pin code (e.g., 1234)')
    result.add_argument('--ensure-screen-off',
                        help='Ensure screen off',
                        action='store_true',
                        default=False)
    result.add_argument('--get-screen-state',
                        help='Get screen state',
                        action='store_true',
                        default=False)
    result.add_argument('--unlock',
                        help='Unlock device',
                        action='store_true',
                        default=False)
    options, args = result.parse_known_args(argv)
    return options, args


def main(argv):
    (options, args) = parse_options(argv)
    if options.capture_screen:
        capture_screen(options.capture_screen, options.device_id)
    if options.ensure_screen_off:
        ensure_screen_off(options.device_id)
    elif options.get_screen_state:
        print(get_screen_state(options.device_id))
    elif options.unlock:
        unlock(options.device_id, options.device_pin)


if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))
