geslaunch.py 15 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#!/usr/bin/env python2
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.

import os
import sys
22
import urllib.parse
23
import subprocess
24 25
from launcher import utils
from urllib.parse import unquote
26
import xml.etree.ElementTree as ET
27
from launcher.baseclasses import GstValidateTest, TestsManager, ScenarioManager, MediaFormatCombination, \
28 29 30 31 32 33 34 35 36 37 38
    MediaDescriptor, GstValidateEncodingTestInterface

GES_DURATION_TOLERANCE = utils.GST_SECOND / 2

GES_LAUNCH_COMMAND = "ges-launch-1.0"
if "win32" in sys.platform:
    GES_LAUNCH_COMMAND += ".exe"


GES_ENCODING_TARGET_COMBINATIONS = [
    MediaFormatCombination("ogg", "vorbis", "theora"),
39
    MediaFormatCombination("ogg", "opus", "theora"),
40
    MediaFormatCombination("webm", "vorbis", "vp8"),
41 42 43
    MediaFormatCombination("webm", "opus", "vp8"),
    MediaFormatCombination("mp4", "aac", "h264"),
    MediaFormatCombination("mp4", "ac3", "h264"),
44
    MediaFormatCombination("quicktime", "aac", "jpeg"),
45
    MediaFormatCombination("mkv", "opus", "h264"),
46 47 48
    MediaFormatCombination("mkv", "vorbis", "h264"),
    MediaFormatCombination("mkv", "opus", "jpeg"),
    MediaFormatCombination("mkv", "vorbis", "jpeg")
49
]
50 51 52 53 54 55 56


def quote_uri(uri):
    """
    Encode a URI/path according to RFC 2396, without touching the file:/// part.
    """
    # Split off the "file:///" part, if present.
57
    parts = urllib.parse.urlsplit(uri, allow_fragments=False)
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    # Make absolutely sure the string is unquoted before quoting again!
    raw_path = unquote(parts.path)
    return utils.path2url(raw_path)


class XgesProjectDescriptor(MediaDescriptor):
    def __init__(self, uri):
        super(XgesProjectDescriptor, self).__init__()

        self._uri = uri
        self._xml_path = utils.url2path(uri)
        self._root = ET.parse(self._xml_path)
        self._duration = None

    def get_media_filepath(self):
        return self._xml_path

    def get_path(self):
        return self._xml_path

    def get_caps(self):
        raise NotImplemented

    def get_uri(self):
        return self._uri

    def get_duration(self):
        if self._duration:
            return self._duration

        for l in self._root.iter():
            if l.tag == "timeline":
90
                self._duration=int(l.attrib['metadatas'].split("duration=(guint64)")[1].split(" ")[0].split(";")[0])
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
                break

        if not self._duration:
            self.error("%s does not have duration! (setting 2mins)" % self._uri)
            self._duration = 2 * 60

        return self._duration

    def get_protocol(self):
        return Protocols.FILE

    def is_seekable(self):
        return True

    def is_image(self):
        return False

    def get_num_tracks(self, track_type):
        num_tracks = 0
        for l in self._root.iter():
            if l.tag == "track":
                if track_type in l.attrib["caps"]:
                    num_tracks += 1
        return num_tracks


class GESTest(GstValidateTest):
118
    def __init__(self, classname, options, reporter, project, scenario=None,
119
                 combination=None, expected_failures=None, nest=False):
120 121 122 123

        super(GESTest, self).__init__(GES_LAUNCH_COMMAND, classname, options, reporter,
                                      scenario=scenario)

124
        self.project = project
125
        self.nested = nest
126 127 128 129 130

    def set_sample_paths(self):
        if not self.options.paths:
            if self.options.disable_recurse:
                return
131 132 133 134
            if self.project:
                paths = [os.path.dirname(self.project.get_media_filepath())]
            else:
                paths = []
135 136 137 138 139 140 141 142 143 144
        else:
            paths = self.options.paths

        if not isinstance(paths, list):
            paths = [paths]

        for path in paths:
            # We always want paths separator to be cut with '/' for ges-launch
            path = path.replace("\\", "/")
            if not self.options.disable_recurse:
145
                self.add_arguments("--ges-sample-path-recurse", quote_uri(path))
146
            else:
147
                self.add_arguments("--ges-sample-paths", quote_uri(path))
148 149 150 151 152

    def build_arguments(self):
        GstValidateTest.build_arguments(self)

        if self.options.mute:
153 154 155 156 157 158 159 160 161
            needs_clock = self.scenario.needs_clock_sync() \
                if self.scenario else False
            audiosink = utils.get_fakesink_for_media_type("audio", needs_clock)
            videosink = utils.get_fakesink_for_media_type("video", needs_clock)
        else:
            audiosink = 'autoaudiosink'
            videosink = 'autovideosink'
        self.add_arguments("--videosink", videosink + " name=videosink")
        self.add_arguments("--audiosink", audiosink + " name=audiosink")
162 163

        self.set_sample_paths()
164 165

        if self.project:
166 167 168 169
            if self.nested:
                self.add_arguments("+clip", self.project.get_uri())
            else:
                self.add_arguments("-l", self.project.get_uri())
170 171

class GESPlaybackTest(GESTest):
172
    def __init__(self, classname, options, reporter, project, scenario,nest):
173
        super(GESPlaybackTest, self).__init__(classname, options, reporter,
174
                                      project, scenario=scenario, nest=nest)
175 176 177 178

    def get_current_value(self):
        return self.get_current_position()

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
class GESScenarioTest(GESTest):
    def __init__(self, classname, options, reporter, scenario):
        super().__init__(classname, options, reporter, None, scenario=scenario)

    def build_arguments(self):
        super().build_arguments()
        self.add_arguments("--set-scenario", self.scenario.path)

    def get_subproc_env(self):
        scenario = self.scenario
        self.scenario = None
        res = super().get_subproc_env()
        self.scenario = scenario

        return res

    def get_current_value(self):
        return self.get_current_position()

198 199

class GESRenderTest(GESTest, GstValidateEncodingTestInterface):
200 201
    def __init__(self, classname, options, reporter, project, combination):
        GESTest.__init__(self, classname, options, reporter, project)
202 203 204 205 206 207 208

        GstValidateEncodingTestInterface.__init__(self, combination, self.project)

    def build_arguments(self):
        GESTest.build_arguments(self)
        self._set_rendering_info()

209 210 211 212 213
    def run_external_checks(self):
        reference_file_path = urllib.parse.urlsplit(self.media_descriptor.get_uri()).path + ".expected_result"
        if os.path.exists(reference_file_path):
            self.run_iqa_test(utils.path2url(reference_file_path))

214 215 216 217
    def _set_rendering_info(self):
        self.dest_file = path = os.path.join(self.options.dest,
                                             self.classname.replace(".render.", os.sep).
                                             replace(".", os.sep))
218
        utils.mkdir(os.path.dirname(urllib.parse.urlsplit(self.dest_file).path))
219 220 221
        if not utils.isuri(self.dest_file):
            self.dest_file = utils.path2url(self.dest_file)

222
        profile = self.get_profile()
223 224 225 226
        self.add_arguments("-f", profile, "-o", self.dest_file)

    def check_results(self):
        if self.result in [Result.PASSED, Result.NOT_RUN] and self.scenario is None:
227 228 229
            if self.process.returncode != 0:
                return super().check_results()

230 231 232 233 234 235 236 237 238 239 240 241
            res, msg = self.check_encoded_file()
            self.set_result(res, msg)
        else:
            if self.result == utils.Result.TIMEOUT:
                missing_eos = False
                try:
                    if utils.get_duration(self.dest_file) == self.project.get_duration():
                        missing_eos = True
                except Exception as e:
                    pass

                if missing_eos is True:
242
                    self.set_result(utils.Result.TIMEOUT, "The rendered file had right duration, MISSING EOS?\n",
243
                                    "failure")
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
            else:
                GstValidateTest.check_results(self)

    def get_current_value(self):
        size = self.get_current_size()
        if size is None:
            return self.get_current_position()

        return size


class GESTestsManager(TestsManager):
    name = "ges"

    _scenarios = ScenarioManager()

    def __init__(self):
        super(GESTestsManager, self).__init__()

    def init(self):
        try:
265
            if "--set-scenario=" in subprocess.check_output([GES_LAUNCH_COMMAND, "--help"]).decode():
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289

                return True
            else:
                self.warning("Can not use ges-launch, it seems not to be compiled against"
                             " gst-validate")
        except subprocess.CalledProcessError as e:
            self.warning("Can not use ges-launch: %s" % e)
        except OSError as e:
            self.warning("Can not use ges-launch: %s" % e)

    def add_options(self, parser):
        group = parser.add_argument_group("GStreamer Editing Services specific option"
                            " and behaviours",
                            description="""
The GStreamer Editing Services launcher will be usable only if GES has been compiled against GstValidate
You can simply run scenarios specifying project as args. For example the following will run all available
and activated scenarios on project.xges:

    $gst-validate-launcher ges /some/ges/project.xges


Available options:""")
        group.add_argument("-P", "--projects-paths", dest="projects_paths",
                         default=os.path.join(utils.DEFAULT_GST_QA_ASSETS,
290
                                              "ges",
291 292
                                              "ges-projects"),
                         help="Paths in which to look for moved medias")
293
        group.add_argument("--ges-scenario-paths", dest="scenarios_path",
294
                         default=None,
295
                         help="Paths in which to look for moved medias")
296 297 298 299 300 301
        group.add_argument("-r", "--disable-recurse-paths", dest="disable_recurse",
                         default=False, action="store_true",
                         help="Whether to recurse into paths to find medias")

    def set_settings(self, options, args, reporter):
        TestsManager.set_settings(self, options, args, reporter)
302
        self._scenarios.config = self.options
303 304 305 306 307 308 309

        try:
            os.makedirs(utils.url2path(options.dest)[0])
        except OSError:
            pass

    def list_tests(self):
310
        return self.tests
311

312
    def register_defaults(self, project_paths=None, scenarios_path=None):
313
        projects = list()
314
        all_scenarios = {}
315
        if not self.args:
316 317 318 319 320
            if project_paths == None:
                path = self.options.projects_paths
            else:
                path = project_paths

321 322 323 324 325
            for root, dirs, files in os.walk(path):
                for f in files:
                    if not f.endswith(".xges"):
                        continue
                    projects.append(utils.path2url(os.path.join(path, root, f)))
326

327 328 329 330 331 332 333 334
            if self.options.scenarios_path:
                scenarios_path = self.options.scenarios_path

            if scenarios_path:
                for root, dirs, files in os.walk(scenarios_path):
                    for f in files:
                        if not f.endswith(".scenario"):
                            continue
335 336 337 338 339
                        f = os.path.join(root, f)
                        config = f + ".config"
                        if not os.path.exists(config):
                            config = None
                        all_scenarios[f] = config
340
        else:
341 342 343
            for proj_uri in self.args:
                if not utils.isuri(proj_uri):
                    proj_uri = utils.path2url(proj_uri)
344

345 346
                if os.path.exists(proj_uri):
                    projects.append(proj_uri)
347

348 349 350 351 352 353 354 355
        if self.options.long_limit != 0:
            scenarios = ["none",
                         "scrub_forward_seeking",
                         "scrub_backward_seeking"]
        else:
            scenarios = ["play_15s",
                         "scrub_forward_seeking_full",
                         "scrub_backward_seeking_full"]
356
        for proj_uri in projects:
357
            # First playback casses
358
            project = XgesProjectDescriptor(proj_uri)
359
            for scenario_name in scenarios:
360 361 362
                scenario = self._scenarios.get_scenario(scenario_name)
                if scenario is None:
                    continue
363 364 365 366

                if scenario.get_min_media_duration() >= (project.get_duration() / utils.GST_SECOND):
                    continue

367
                classname = "playback.%s.%s" % (scenario.name,
368
                                                    os.path.basename(proj_uri).replace(".xges", ""))
369 370 371
                self.add_test(GESPlaybackTest(classname,
                                              self.options,
                                              self.reporter,
372
                                              project,
373 374 375 376 377 378 379 380 381 382 383
                                              scenario=scenario,
                                              nest=False))
                #For nested timelines
                classname = "playback.nested.%s.%s" % (scenario.name,
                                                    os.path.basename(proj_uri).replace(".xges", ""))
                self.add_test(GESPlaybackTest(classname,
                                              self.options,
                                              self.reporter,
                                              project,
                                              scenario=scenario,
                                              nest=True))
384 385 386

            # And now rendering casses
            for comb in GES_ENCODING_TARGET_COMBINATIONS:
387
                classname = "render.%s.%s" % (str(comb).replace(' ', '_'),
388
                                                  os.path.splitext(os.path.basename(proj_uri))[0])
389
                self.add_test(GESRenderTest(classname, self.options,
390
                                            self.reporter, project,
391 392
                                            combination=comb)
                                  )
393
        if all_scenarios:
394 395
            for scenario in self._scenarios.discover_scenarios(list(all_scenarios.keys())):
                config = all_scenarios[scenario.path]
396
                classname = "scenario.%s" % scenario.name
397 398 399 400
                test = GESScenarioTest(classname, self.options, self.reporter, scenario=scenario)
                if config:
                    test.add_validate_config(config)
                self.add_test(test)