• R/O
  • HTTP
  • SSH
  • HTTPS

Tags
Keine Tags

Frequently used words (click to add to your profile)

javac++androidlinuxc#windowsobjective-ccocoa誰得qtpythonphprubygameguibathyscaphec計画中(planning stage)翻訳omegatframeworktwitterdomtestvb.netdirectxゲームエンジンbtronarduinopreviewer

docker based ci tool


File Info

Rev. 65142a1d8af111906f8c2f4fcc841cc5362977e3
Größe 4,473 Bytes
Zeit 2019-06-06 02:19:38
Autor hylom
Log Message

core-server: fix log extractor

Content

#!/usr/bin/env python3

import os
import re
import pathlib
import time
import concurrent.futures
import json

import grpc
from concurrent import futures

import docker

from dockrun_pb2 import RunTaskRequest, RunTaskReply
from dockrun_pb2_grpc import DockRunServicer, add_DockRunServicer_to_server

from config import config as cfg

PORT = '[::]:1234'
_ONE_DAY_IN_SECONDS = 60 * 60 * 24

OUTPUT_DIR = './results'

executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)

def run_task(param, task):
    print("run task: '{}'".format(param["task_name"]))

    status = {
        "succeeded": False,
        "container_status": "Running",
        "user_agent": param["user_agent"],
        "parameter": {},
        "parameter_type": param["parameter_type"],
    };
    for k in param["parameter"].keys():
        status["parameter"][k] = param["parameter"][k]

    reporter = FileReporter(param["task_name"], status)
    reporter.write_status()

    run_mode = task.get("run_mode", "run");
    start_epoch = time.time()
    
    if run_mode == "run":
        image = task["image"]
        client = docker.from_env()
        container = client.containers.run(image=image, detach=True)
        container.wait(timeout=task.get("timeout", task.get("timeout", 60)))
    elif run_mode == "start":
        client = docker.from_env()
        container = client.containers.get(task["container"])
        container.start()
        container.wait(timeout=task.get("timeout", task.get("timeout", 60)))
        
    print("task done.")

    #lines = []
    #for line in container.logs(stream=True):
    #    lines.append(line)
    #    logs = b"".join(lines)
    logs = container.logs(since=start_epoch)

    print("extract logs done.")

    if run_mode == "run" and task.get("auto_remove", True):
        container.remove()
        print("container removed.")

    reporter.set_log(logs)
    status["container_status"] = "Done"
    reporter.write_status()
    reporter.write_log()

    print("done.")
    return True


class FileReporter():
    def __init__(self, task_name, status, log=None):
        self.task_name = task_name
        self.status = status
        self.log = log
        self.output_basedir = os.path.join(OUTPUT_DIR, task_name)
        self.output_dir = self._get_output_directory()

    def set_log(self, log):
        self.log = log

    def write_log(self):
        pn = self.output_dir / "log.txt"
        with pn.open(mode='wb') as f:
            f.write(self.log)

    def write_status(self):
        pn = self.output_dir / "status.json"
        with pn.open(mode='w') as f:
            json.dump(self.status, f)

    def _get_output_directory(self):
        if not os.path.exists(self.output_basedir):
            os.mkdir(self.output_basedir)
        p = pathlib.Path(self.output_basedir)
        rex = re.compile(r'''^\d+$''')
        last_num = 0
        for x in p.iterdir():
            if not x.is_dir():
                continue
            if rex.match(x.name) and last_num < int(x.name):
                last_num = int(x.name)
        out_dir = p.joinpath(str(last_num + 1))
        out_dir.mkdir()
        return out_dir

    
def _exec_done(future):
    result = future.result()
    print("task done.")
    print(result)
            
class DockRun(DockRunServicer):
    def RunTask(self, req, context):
        tasks = cfg.get("tasks", {})
        task = tasks.get(req.task_name)

        print("RunTask: {}".format(req.task_name))

        if not task:
            message = "task '{}' not found.".format(req.task_name)
            print(message);
            return RunTaskReply(is_succeed=False, message=message)

        # generate process
        param = {
            "task_name": req.task_name,
            "client_name": req.client_name,
            "user_agent": req.user_agent,
            "parameter": req.parameter,
            "parameter_type": req.parameter_type,
        }
        print("execute task...")
        future = executor.submit(run_task, param, task)
        future.add_done_callback(_exec_done)

        return RunTaskReply(is_succeed=True, message="")

def main():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    add_DockRunServicer_to_server(
        DockRun(), server)
    server.add_insecure_port(PORT)
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    main()