summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/fetch2/npm.py
blob: 47898509ff930df2d36db4b80c3f5d3232c0d987 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# Copyright (C) 2020 Savoir-Faire Linux
#
# SPDX-License-Identifier: GPL-2.0-only
#
"""
BitBake 'Fetch' npm implementation

npm fetcher support the SRC_URI with format of:
SRC_URI = "npm://some.registry.url;OptionA=xxx;OptionB=xxx;..."

Supported SRC_URI options are:

- package
   The npm package name. This is a mandatory parameter.

- version
    The npm package version. This is a mandatory parameter.

- downloadfilename
    Specifies the filename used when storing the downloaded file.

- destsuffix
    Specifies the directory to use to unpack the package (default: npm).
"""

import base64
import json
import os
import re
import shlex
import tempfile
import bb
from bb.fetch2 import Fetch
from bb.fetch2 import FetchError
from bb.fetch2 import FetchMethod
from bb.fetch2 import MissingParameterError
from bb.fetch2 import ParameterError
from bb.fetch2 import URI
from bb.fetch2 import check_network_access
from bb.fetch2 import runfetchcmd
from bb.utils import is_semver

def npm_package(package):
    """Convert the npm package name to remove unsupported character"""
    # Scoped package names (with the @) use the same naming convention
    # as the 'npm pack' command.
    if package.startswith("@"):
        return re.sub("/", "-", package[1:])
    return package

def npm_filename(package, version):
    """Get the filename of a npm package"""
    return npm_package(package) + "-" + version + ".tgz"

def npm_localfile(package, version):
    """Get the local filename of a npm package"""
    return os.path.join("npm2", npm_filename(package, version))

def npm_integrity(integrity):
    """
    Get the checksum name and expected value from the subresource integrity
        https://www.w3.org/TR/SRI/
    """
    algo, value = integrity.split("-", maxsplit=1)
    return "%ssum" % algo, base64.b64decode(value).hex()

def npm_unpack(tarball, destdir, d):
    """Unpack a npm tarball"""
    bb.utils.mkdirhier(destdir)
    cmd = "tar --extract --gzip --file=%s" % shlex.quote(tarball)
    cmd += " --no-same-owner"
    cmd += " --strip-components=1"
    runfetchcmd(cmd, d, workdir=destdir)

class NpmEnvironment(object):
    """
    Using a npm config file seems more reliable than using cli arguments.
    This class allows to create a controlled environment for npm commands.
    """
    def __init__(self, d, configs=None):
        self.d = d
        self.configs = configs

    def run(self, cmd, args=None, configs=None, workdir=None):
        """Run npm command in a controlled environment"""
        with tempfile.TemporaryDirectory() as tmpdir:
            d = bb.data.createCopy(self.d)
            d.setVar("HOME", tmpdir)

            cfgfile = os.path.join(tmpdir, "npmrc")

            if not workdir:
                workdir = tmpdir

            def _run(cmd):
                cmd = "NPM_CONFIG_USERCONFIG=%s " % cfgfile + cmd
                cmd = "NPM_CONFIG_GLOBALCONFIG=%s " % cfgfile + cmd
                return runfetchcmd(cmd, d, workdir=workdir)

            if self.configs:
                for key, value in self.configs:
                    _run("npm config set %s %s" % (key, shlex.quote(value)))

            if configs:
                for key, value in configs:
                    _run("npm config set %s %s" % (key, shlex.quote(value)))

            if args:
                for key, value in args:
                    cmd += " --%s=%s" % (key, shlex.quote(value))

            return _run(cmd)

class Npm(FetchMethod):
    """Class to fetch a package from a npm registry"""

    def supports(self, ud, d):
        """Check if a given url can be fetched with npm"""
        return ud.type in ["npm"]

    def urldata_init(self, ud, d):
        """Init npm specific variables within url data"""
        ud.package = None
        ud.version = None
        ud.registry = None

        # Get the 'package' parameter
        if "package" in ud.parm:
            ud.package = ud.parm.get("package")

        if not ud.package:
            raise MissingParameterError("Parameter 'package' required", ud.url)

        # Get the 'version' parameter
        if "version" in ud.parm:
            ud.version = ud.parm.get("version")

        if not ud.version:
            raise MissingParameterError("Parameter 'version' required", ud.url)

        if not is_semver(ud.version) and not ud.version == "latest":
            raise ParameterError("Invalid 'version' parameter", ud.url)

        # Extract the 'registry' part of the url
        ud.registry = re.sub(r"^npm://", "http://", ud.url.split(";")[0])

        # Using the 'downloadfilename' parameter as local filename
        # or the npm package name.
        if "downloadfilename" in ud.parm:
            ud.localfile = d.expand(ud.parm["downloadfilename"])
        else:
            ud.localfile = npm_localfile(ud.package, ud.version)

        # Get the base 'npm' command
        ud.basecmd = d.getVar("FETCHCMD_npm") or "npm"

        # This fetcher resolves a URI from a npm package name and version and
        # then forwards it to a proxy fetcher. A resolve file containing the
        # resolved URI is created to avoid unwanted network access (if the file
        # already exists). The management of the donestamp file, the lockfile
        # and the checksums are forwarded to the proxy fetcher.
        ud.proxy = None
        ud.needdonestamp = False
        ud.resolvefile = self.localpath(ud, d) + ".resolved"

    def _resolve_proxy_url(self, ud, d):
        def _npm_view():
            configs = []
            configs.append(("json", "true"))
            configs.append(("registry", ud.registry))
            pkgver = shlex.quote(ud.package + "@" + ud.version)
            cmd = ud.basecmd + " view %s" % pkgver
            env = NpmEnvironment(d)
            check_network_access(d, cmd, ud.registry)
            view_string = env.run(cmd, configs=configs)

            if not view_string:
                raise FetchError("Unavailable package %s" % pkgver, ud.url)

            try:
                view = json.loads(view_string)

                error = view.get("error")
                if error is not None:
                    raise FetchError(error.get("summary"), ud.url)

                if ud.version == "latest":
                    bb.warn("The npm package %s is using the latest " \
                            "version available. This could lead to " \
                            "non-reproducible builds." % pkgver)
                elif ud.version != view.get("version"):
                    raise ParameterError("Invalid 'version' parameter", ud.url)

                return view

            except Exception as e:
                raise FetchError("Invalid view from npm: %s" % str(e), ud.url)

        def _get_url(view):
            tarball_url = view.get("dist", {}).get("tarball")

            if tarball_url is None:
                raise FetchError("Invalid 'dist.tarball' in view", ud.url)

            uri = URI(tarball_url)
            uri.params["downloadfilename"] = ud.localfile

            integrity = view.get("dist", {}).get("integrity")
            shasum = view.get("dist", {}).get("shasum")

            if integrity is not None:
                checksum_name, checksum_expected = npm_integrity(integrity)
                uri.params[checksum_name] = checksum_expected
            elif shasum is not None:
                uri.params["sha1sum"] = shasum
            else:
                raise FetchError("Invalid 'dist.integrity' in view", ud.url)

            return str(uri)

        url = _get_url(_npm_view())

        bb.utils.mkdirhier(os.path.dirname(ud.resolvefile))
        with open(ud.resolvefile, "w") as f:
            f.write(url)

    def _setup_proxy(self, ud, d):
        if ud.proxy is None:
            if not os.path.exists(ud.resolvefile):
                self._resolve_proxy_url(ud, d)

            with open(ud.resolvefile, "r") as f:
                url = f.read()

            # Avoid conflicts between the environment data and:
            # - the proxy url checksum
            data = bb.data.createCopy(d)
            data.delVarFlags("SRC_URI")
            ud.proxy = Fetch([url], data)

    def _get_proxy_method(self, ud, d):
        self._setup_proxy(ud, d)
        proxy_url = ud.proxy.urls[0]
        proxy_ud = ud.proxy.ud[proxy_url]
        proxy_d = ud.proxy.d
        proxy_ud.setup_localpath(proxy_d)
        return proxy_ud.method, proxy_ud, proxy_d

    def verify_donestamp(self, ud, d):
        """Verify the donestamp file"""
        proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
        return proxy_m.verify_donestamp(proxy_ud, proxy_d)

    def update_donestamp(self, ud, d):
        """Update the donestamp file"""
        proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
        proxy_m.update_donestamp(proxy_ud, proxy_d)

    def need_update(self, ud, d):
        """Force a fetch, even if localpath exists ?"""
        if not os.path.exists(ud.resolvefile):
            return True
        if ud.version == "latest":
            return True
        proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
        return proxy_m.need_update(proxy_ud, proxy_d)

    def try_mirrors(self, fetch, ud, d, mirrors):
        """Try to use a mirror"""
        proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
        return proxy_m.try_mirrors(fetch, proxy_ud, proxy_d, mirrors)

    def download(self, ud, d):
        """Fetch url"""
        self._setup_proxy(ud, d)
        ud.proxy.download()

    def unpack(self, ud, rootdir, d):
        """Unpack the downloaded archive"""
        destsuffix = ud.parm.get("destsuffix", "npm")
        destdir = os.path.join(rootdir, destsuffix)
        npm_unpack(ud.localpath, destdir, d)

    def clean(self, ud, d):
        """Clean any existing full or partial download"""
        if os.path.exists(ud.resolvefile):
            self._setup_proxy(ud, d)
            ud.proxy.clean()
            bb.utils.remove(ud.resolvefile)

    def done(self, ud, d):
        """Is the download done ?"""
        if not os.path.exists(ud.resolvefile):
            return False
        proxy_m, proxy_ud, proxy_d = self._get_proxy_method(ud, d)
        return proxy_m.done(proxy_ud, proxy_d)