summaryrefslogtreecommitdiffstats
path: root/scripts/lib/wic/engine.py
blob: e169147aba8006a69c3451c5746ccd8da75eb024 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# Copyright (c) 2013, Intel Corporation.
# All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# DESCRIPTION

# This module implements the image creation engine used by 'wic' to
# create images.  The engine parses through the OpenEmbedded kickstart
# (wks) file specified and generates images that can then be directly
# written onto media.
#
# AUTHORS
# Tom Zanussi <tom.zanussi (at] linux.intel.com>
#

import logging
import os
import tempfile

from collections import namedtuple, OrderedDict
from distutils.spawn import find_executable

from wic import WicError
from wic.filemap import sparse_copy
from wic.pluginbase import PluginMgr
from wic.misc import get_bitbake_var, exec_cmd

logger = logging.getLogger('wic')

def verify_build_env():
    """
    Verify that the build environment is sane.

    Returns True if it is, false otherwise
    """
    if not os.environ.get("BUILDDIR"):
        raise WicError("BUILDDIR not found, exiting. (Did you forget to source oe-init-build-env?)")

    return True


CANNED_IMAGE_DIR = "lib/wic/canned-wks" # relative to scripts
SCRIPTS_CANNED_IMAGE_DIR = "scripts/" + CANNED_IMAGE_DIR
WIC_DIR = "wic"

def build_canned_image_list(path):
    layers_path = get_bitbake_var("BBLAYERS")
    canned_wks_layer_dirs = []

    if layers_path is not None:
        for layer_path in layers_path.split():
            for wks_path in (WIC_DIR, SCRIPTS_CANNED_IMAGE_DIR):
                cpath = os.path.join(layer_path, wks_path)
                if os.path.isdir(cpath):
                    canned_wks_layer_dirs.append(cpath)

    cpath = os.path.join(path, CANNED_IMAGE_DIR)
    canned_wks_layer_dirs.append(cpath)

    return canned_wks_layer_dirs

def find_canned_image(scripts_path, wks_file):
    """
    Find a .wks file with the given name in the canned files dir.

    Return False if not found
    """
    layers_canned_wks_dir = build_canned_image_list(scripts_path)

    for canned_wks_dir in layers_canned_wks_dir:
        for root, dirs, files in os.walk(canned_wks_dir):
            for fname in files:
                if fname.endswith("~") or fname.endswith("#"):
                    continue
                if fname.endswith(".wks") and wks_file + ".wks" == fname:
                    fullpath = os.path.join(canned_wks_dir, fname)
                    return fullpath
    return None


def list_canned_images(scripts_path):
    """
    List the .wks files in the canned image dir, minus the extension.
    """
    layers_canned_wks_dir = build_canned_image_list(scripts_path)

    for canned_wks_dir in layers_canned_wks_dir:
        for root, dirs, files in os.walk(canned_wks_dir):
            for fname in files:
                if fname.endswith("~") or fname.endswith("#"):
                    continue
                if fname.endswith(".wks"):
                    fullpath = os.path.join(canned_wks_dir, fname)
                    with open(fullpath) as wks:
                        for line in wks:
                            desc = ""
                            idx = line.find("short-description:")
                            if idx != -1:
                                desc = line[idx + len("short-description:"):].strip()
                                break
                    basename = os.path.splitext(fname)[0]
                    print("  %s\t\t%s" % (basename.ljust(30), desc))


def list_canned_image_help(scripts_path, fullpath):
    """
    List the help and params in the specified canned image.
    """
    found = False
    with open(fullpath) as wks:
        for line in wks:
            if not found:
                idx = line.find("long-description:")
                if idx != -1:
                    print()
                    print(line[idx + len("long-description:"):].strip())
                    found = True
                continue
            if not line.strip():
                break
            idx = line.find("#")
            if idx != -1:
                print(line[idx + len("#:"):].rstrip())
            else:
                break


def list_source_plugins():
    """
    List the available source plugins i.e. plugins available for --source.
    """
    plugins = PluginMgr.get_plugins('source')

    for plugin in plugins:
        print("  %s" % plugin)


def wic_create(wks_file, rootfs_dir, bootimg_dir, kernel_dir,
               native_sysroot, options):
    """
    Create image

    wks_file - user-defined OE kickstart file
    rootfs_dir - absolute path to the build's /rootfs dir
    bootimg_dir - absolute path to the build's boot artifacts directory
    kernel_dir - absolute path to the build's kernel directory
    native_sysroot - absolute path to the build's native sysroots dir
    image_output_dir - dirname to create for image
    options - wic command line options (debug, bmap, etc)

    Normally, the values for the build artifacts values are determined
    by 'wic -e' from the output of the 'bitbake -e' command given an
    image name e.g. 'core-image-minimal' and a given machine set in
    local.conf.  If that's the case, the variables get the following
    values from the output of 'bitbake -e':

    rootfs_dir:        IMAGE_ROOTFS
    kernel_dir:        DEPLOY_DIR_IMAGE
    native_sysroot:    STAGING_DIR_NATIVE

    In the above case, bootimg_dir remains unset and the
    plugin-specific image creation code is responsible for finding the
    bootimg artifacts.

    In the case where the values are passed in explicitly i.e 'wic -e'
    is not used but rather the individual 'wic' options are used to
    explicitly specify these values.
    """
    try:
        oe_builddir = os.environ["BUILDDIR"]
    except KeyError:
        raise WicError("BUILDDIR not found, exiting. (Did you forget to source oe-init-build-env?)")

    if not os.path.exists(options.outdir):
        os.makedirs(options.outdir)

    pname = 'direct'
    plugin_class = PluginMgr.get_plugins('imager').get(pname)
    if not plugin_class:
        raise WicError('Unknown plugin: %s' % pname)

    plugin = plugin_class(wks_file, rootfs_dir, bootimg_dir, kernel_dir,
                          native_sysroot, oe_builddir, options)

    plugin.do_create()

    logger.info("The image(s) were created using OE kickstart file:\n  %s", wks_file)


def wic_list(args, scripts_path):
    """
    Print the list of images or source plugins.
    """
    if args.list_type is None:
        return False

    if args.list_type == "images":

        list_canned_images(scripts_path)
        return True
    elif args.list_type == "source-plugins":
        list_source_plugins()
        return True
    elif len(args.help_for) == 1 and args.help_for[0] == 'help':
        wks_file = args.list_type
        fullpath = find_canned_image(scripts_path, wks_file)
        if not fullpath:
            raise WicError("No image named %s found, exiting. "
                           "(Use 'wic list images' to list available images, "
                           "or specify a fully-qualified OE kickstart (.wks) "
                           "filename)" % wks_file)

        list_canned_image_help(scripts_path, fullpath)
        return True

    return False


class Disk:
    def __init__(self, imagepath, native_sysroot):
        self.imagepath = imagepath
        self.native_sysroot = native_sysroot
        self._partitions = None
        self._partimages = {}
        self._lsector_size = None
        self._psector_size = None
        self._ptable_format = None

        # find parted
        self.paths = "/bin:/usr/bin:/usr/sbin:/sbin/"
        if native_sysroot:
            for path in self.paths.split(':'):
                self.paths = "%s%s:%s" % (native_sysroot, path, self.paths)

        self.parted = find_executable("parted", self.paths)
        if not self.parted:
            raise WicError("Can't find executable parted")

    def __del__(self):
        for path in self._partimages.values():
            os.unlink(path)

    @property
    def partitions(self):
        if self._partitions is None:
            self._partitions = OrderedDict()
            out = exec_cmd("%s -sm %s unit B print" % (self.parted, self.imagepath))
            parttype = namedtuple("Part", "pnum start end size fstype")
            splitted = out.splitlines()
            lsector_size, psector_size, self._ptable_format = splitted[1].split(":")[3:6]
            self._lsector_size = int(lsector_size)
            self._psector_size = int(psector_size)
            for line in splitted[2:]:
                pnum, start, end, size, fstype = line.split(':')[:5]
                partition = parttype(int(pnum), int(start[:-1]), int(end[:-1]),
                                     int(size[:-1]), fstype)
                self._partitions[pnum] = partition

        return self._partitions

    def __getattr__(self, name):
        """Get path to the executable in a lazy way."""
        if name in ("mdir", "mcopy", "mdel", "mdeltree"):
            aname = "_%s" % name
            if aname not in self.__dict__:
                setattr(self, aname, find_executable(name, self.paths))
                if aname not in self.__dict__:
                    raise WicError("Can't find executable {}".format(name))
            return self.__dict__[aname]
        return self.__dict__[name]

    def _get_part_image(self, pnum):
        if pnum not in self.partitions:
            raise WicError("Partition %s is not in the image")
        part = self.partitions[pnum]
        if not part.fstype.startswith("fat"):
            raise WicError("Not supported fstype: {}".format(part.fstype))
        if pnum not in self._partimages:
            tmpf = tempfile.NamedTemporaryFile(prefix="wic-part")
            dst_fname = tmpf.name
            tmpf.close()
            sparse_copy(self.imagepath, dst_fname, skip=part.start, length=part.size)
            self._partimages[pnum] = dst_fname

        return self._partimages[pnum]

    def _put_part_image(self, pnum):
        """Put partition image into partitioned image."""
        sparse_copy(self._partimages[pnum], self.imagepath,
                    seek=self.partitions[pnum].start)

    def dir(self, pnum, path):
        return exec_cmd("{} -i {} ::{}".format(self.mdir,
                                               self._get_part_image(pnum),
                                               path))

    def copy(self, src, pnum, path):
        """Copy partition image into wic image."""
        cmd = "{} -i {} -snop {} ::{}".format(self.mcopy,
                                              self._get_part_image(pnum),
                                              src, path)
        exec_cmd(cmd)
        self._put_part_image(pnum)

    def remove(self, pnum, path):
        """Remove files/dirs from the partition."""
        partimg = self._get_part_image(pnum)
        cmd = "{} -i {} ::{}".format(self.mdel, partimg, path)
        try:
            exec_cmd(cmd)
        except WicError as err:
            if "not found" in str(err) or "non empty" in str(err):
                # mdel outputs 'File ... not found' or 'directory .. non empty"
                # try to use mdeltree as path could be a directory
                cmd = "{} -i {} ::{}".format(self.mdeltree,
                                             partimg, path)
                exec_cmd(cmd)
            else:
                raise err
        self._put_part_image(pnum)

def wic_ls(args, native_sysroot):
    """List contents of partitioned image or vfat partition."""
    disk = Disk(args.path.image, native_sysroot)
    if not args.path.part:
        if disk.partitions:
            print('Num     Start        End          Size      Fstype')
            for part in disk.partitions.values():
                print("{:2d}  {:12d} {:12d} {:12d}  {}".format(\
                          part.pnum, part.start, part.end,
                          part.size, part.fstype))
    else:
        path = args.path.path or '/'
        print(disk.dir(args.path.part, path))

def wic_cp(args, native_sysroot):
    """
    Copy local file or directory to the vfat partition of
    partitioned image.
    """
    disk = Disk(args.dest.image, native_sysroot)
    disk.copy(args.src, args.dest.part, args.dest.path)

def wic_rm(args, native_sysroot):
    """
    Remove files or directories from the vfat partition of
    partitioned image.
    """
    disk = Disk(args.path.image, native_sysroot)
    disk.remove(args.path.part, args.path.path)

def find_canned(scripts_path, file_name):
    """
    Find a file either by its path or by name in the canned files dir.

    Return None if not found
    """
    if os.path.exists(file_name):
        return file_name

    layers_canned_wks_dir = build_canned_image_list(scripts_path)
    for canned_wks_dir in layers_canned_wks_dir:
        for root, dirs, files in os.walk(canned_wks_dir):
            for fname in files:
                if fname == file_name:
                    fullpath = os.path.join(canned_wks_dir, fname)
                    return fullpath

def get_custom_config(boot_file):
    """
    Get the custom configuration to be used for the bootloader.

    Return None if the file can't be found.
    """
    # Get the scripts path of poky
    scripts_path = os.path.abspath("%s/../.." % os.path.dirname(__file__))

    cfg_file = find_canned(scripts_path, boot_file)
    if cfg_file:
        with open(cfg_file, "r") as f:
            config = f.read()
        return config