See More

#!/usr/bin/env python3 # # bugzilla - a commandline frontend for the python bugzilla module # # Copyright (C) 2007-2017 Red Hat Inc. # Author: Will Woods # Author: Cole Robinson # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. See http://www.gnu.org/copyleft/gpl.html for # the full text of the license. from __future__ import print_function import locale from logging import getLogger, DEBUG, INFO, WARN, StreamHandler, Formatter import argparse import os import re import socket import sys import tempfile # pylint: disable=import-error if sys.version_info[0] >= 3: # pylint: disable=no-name-in-module,redefined-builtin from xmlrpc.client import Fault, ProtocolError from urllib.parse import urlparse basestring = (str, bytes) else: from xmlrpclib import Fault, ProtocolError from urlparse import urlparse # pylint: enable=import-error import requests.exceptions import bugzilla DEFAULT_BZ = 'https://bugzilla.redhat.com/xmlrpc.cgi' format_field_re = re.compile("%{([a-z0-9_]+)(?::([^}]*))?}") log = getLogger(bugzilla.__name__) ################ # Util helpers # ################ def _is_unittest(): return bool(os.getenv("__BUGZILLA_UNITTEST")) def _is_unittest_debug(): return bool(os.getenv("__BUGZILLA_UNITTEST_DEBUG")) def to_encoding(ustring): string = '' if isinstance(ustring, basestring): string = ustring elif ustring is not None: string = str(ustring) if sys.version_info[0] >= 3: return string preferred = locale.getpreferredencoding() if _is_unittest(): preferred = "UTF-8" return string.encode(preferred, 'replace') def open_without_clobber(name, *args): """ Try to open the given file with the given mode; if that filename exists, try "name.1", "name.2", etc. until we find an unused filename. """ fd = None count = 1 orig_name = name while fd is None: try: fd = os.open(name, os.O_CREAT | os.O_EXCL, 0o666) except OSError as err: if err.errno == os.errno.EEXIST: name = "%s.%i" % (orig_name, count) count += 1 else: raise IOError(err.errno, err.strerror, err.filename) fobj = open(name, *args) if fd != fobj.fileno(): os.close(fd) return fobj def get_default_url(): """ Grab a default URL from bugzillarc [DEFAULT] url=X """ from bugzilla.base import _open_bugzillarc cfg = _open_bugzillarc() if cfg: cfgurl = cfg.defaults().get("url", None) if cfgurl is not None: log.debug("bugzillarc: found cli url=%s", cfgurl) return cfgurl return DEFAULT_BZ def setup_logging(debug, verbose): handler = StreamHandler(sys.stderr) handler.setFormatter(Formatter( "[%(asctime)s] %(levelname)s (%(module)s:%(lineno)d) %(message)s", "%H:%M:%S")) log.addHandler(handler) if debug: log.setLevel(DEBUG) elif verbose: log.setLevel(INFO) else: log.setLevel(WARN) if _is_unittest_debug(): log.setLevel(DEBUG) ################## # Option parsing # ################## def _setup_root_parser(): epilog = 'Try "bugzilla COMMAND --help" for command-specific help.' p = argparse.ArgumentParser(epilog=epilog) default_url = get_default_url() # General bugzilla connection options p.add_argument('--bugzilla', default=default_url, help="bugzilla XMLRPC URI. default: %s" % default_url) p.add_argument("--nosslverify", dest="sslverify", action="store_false", default=True, help="Don't error on invalid bugzilla SSL certificate") p.add_argument('--cert', help="client side certificate file needed by the webserver") p.add_argument('--login', action="store_true", help='Run interactive "login" before performing the ' 'specified command.') p.add_argument('--username', help="Log in with this username") p.add_argument('--password', help="Log in with this password") p.add_argument('--ensure-logged-in', action="store_true", help="Raise an error if we aren't logged in to bugzilla. " "Consider using this if you are depending on " "cached credentials, to ensure that when they expire the " "tool errors, rather than subtly change output.") p.add_argument('--no-cache-credentials', action='store_false', default=True, dest='cache_credentials', help="Don't save any bugzilla cookies or tokens to disk, and " "don't use any pre-existing credentials.") p.add_argument('--cookiefile', default=None, help="cookie file to use for bugzilla authentication") p.add_argument('--tokenfile', default=None, help="token file to use for bugzilla authentication") p.add_argument('--verbose', action='store_true', help="give more info about what's going on") p.add_argument('--debug', action='store_true', help="output bunches of debugging info") p.add_argument('--version', action='version', version=bugzilla.__version__) # Allow user to specify BZClass to initialize. Kinda weird for the # CLI, I'd rather people file bugs about this so we can fix our detection. # So hide it from the help output but keep it for back compat p.add_argument('--bztype', default='auto', help=argparse.SUPPRESS) return p def _parser_add_output_options(p): outg = p.add_argument_group("Output format options") outg.add_argument('--full', action='store_const', dest='output', const='full', default='normal', help="output detailed bug info") outg.add_argument('-i', '--ids', action='store_const', dest='output', const='ids', help="output only bug IDs") outg.add_argument('-e', '--extra', action='store_const', dest='output', const='extra', help="output additional bug information " "(keywords, Whiteboards, etc.)") outg.add_argument('--oneline', action='store_const', dest='output', const='oneline', help="one line summary of the bug (useful for scripts)") outg.add_argument('--raw', action='store_const', dest='output', const='raw', help="raw output of the bugzilla contents") outg.add_argument('--outputformat', help="Print output in the form given. " "You can use RPM-style tags that match bug " "fields, e.g.: '%%{id}: %%{summary}'. See the man page " "section 'Output options' for more details.") def _parser_add_bz_fields(rootp, command): cmd_new = (command == "new") cmd_query = (command == "query") cmd_modify = (command == "modify") if cmd_new: comment_help = "Set initial bug comment/description" elif cmd_query: comment_help = "Search all bug comments" else: comment_help = "Add new bug comment" p = rootp.add_argument_group("Standard bugzilla options") p.add_argument('-p', '--product', help="Product name") p.add_argument('-v', '--version', help="Product version") p.add_argument('-c', '--component', help="Component name") p.add_argument('-t', '--summary', '--short_desc', help="Bug summary") p.add_argument('-l', '--comment', '--long_desc', help=comment_help) if not cmd_query: p.add_argument("--comment-tag", action="append", help="Comment tag for the new comment") p.add_argument("--sub-component", action="append", help="RHBZ sub component field") p.add_argument('-o', '--os', help="Operating system") p.add_argument('--arch', help="Arch this bug occurs on") p.add_argument('-x', '--severity', help="Bug severity") p.add_argument('-z', '--priority', help="Bug priority") p.add_argument('--alias', help='Bug alias (name)') p.add_argument('-s', '--status', '--bug_status', help='Bug status (NEW, ASSIGNED, etc.)') p.add_argument('-u', '--url', help="URL field") p.add_argument('-m', '--target_milestone', help="Target milestone") p.add_argument('--target_release', help="RHBZ Target release") p.add_argument('--blocked', action="append", help="Bug IDs that this bug blocks") p.add_argument('--dependson', action="append", help="Bug IDs that this bug depends on") p.add_argument('--keywords', action="append", help="Bug keywords") p.add_argument('--groups', action="append", help="Which user groups can view this bug") p.add_argument('--cc', action="append", help="CC list") p.add_argument('-a', '--assigned_to', '--assignee', help="Bug assignee") p.add_argument('-q', '--qa_contact', help='QA contact') if not cmd_new: p.add_argument('-f', '--flag', action='append', help="Bug flags state. Ex:\n" " --flag needinfo?\n" " --flag dev_ack+ \n" " clear with --flag needinfoX") p.add_argument("--tags", action="append", help="Tags/Personal Tags field.") p.add_argument('-w', "--whiteboard", '--status_whiteboard', action="append", help='Whiteboard field') p.add_argument("--devel_whiteboard", action="append", help='RHBZ devel whiteboard field') p.add_argument("--internal_whiteboard", action="append", help='RHBZ internal whiteboard field') p.add_argument("--qa_whiteboard", action="append", help='RHBZ QA whiteboard field') p.add_argument('-F', '--fixed_in', help="RHBZ 'Fixed in version' field") # Put this at the end, so it sticks out more p.add_argument('--field', metavar="FIELD=VALUE", action="append", dest="fields", help="Manually specify a bugzilla XMLRPC field. FIELD is " "the raw name used by the bugzilla instance. For example if your " "bugzilla instance has a custom field cf_my_field, do:\n" " --field cf_my_field=VALUE") # Used by unit tests, not for end user consumption p.add_argument('--__test-return-result', action="store_true", dest="test_return_result", help=argparse.SUPPRESS) if not cmd_modify: _parser_add_output_options(rootp) def _setup_action_new_parser(subparsers): description = ("Create a new bug report. " "--product, --component, --version, --summary, and --comment " "must be specified. " "Options that take multiple values accept comma separated lists, " "including --cc, --blocks, --dependson, --groups, and --keywords.") p = subparsers.add_parser("new", description=description) _parser_add_bz_fields(p, "new") def _setup_action_query_parser(subparsers): description = ("List bug reports that match the given criteria. " "Certain options can accept a comma separated list to query multiple " "values, including --status, --component, --product, --version, --id.") epilog = ("Note: querying via explicit command line options will only " "get you so far. See the --from-url option for a way to use powerful " "Web UI queries from the command line.") p = subparsers.add_parser("query", description=description, epilog=epilog) _parser_add_bz_fields(p, "query") g = p.add_argument_group("'query' specific options") g.add_argument('-b', '--id', '--bug_id', help="specify individual bugs by IDs, separated with commas") g.add_argument('-r', '--reporter', help="Email: search reporter email for given address") g.add_argument('--quicksearch', help="Search using bugzilla's quicksearch functionality.") g.add_argument('--savedsearch', help="Name of a bugzilla saved search. If you don't own this " "saved search, you must passed --savedsearch_sharer_id.") g.add_argument('--savedsearch-sharer-id', help="Owner ID of the --savedsearch. You can get this ID from " "the URL bugzilla generates when running the saved search " "from the web UI.") # Keep this at the end so it sticks out more g.add_argument('--from-url', metavar="WEB_QUERY_URL", help="Make a working query via bugzilla's 'Advanced search' web UI, " "grab the url from your browser (the string with query.cgi or " "buglist.cgi in it), and --from-url will run it via the " "bugzilla API. Don't forget to quote the string! " "This only works for Bugzilla 5 and Red Hat bugzilla") # Deprecated options p.add_argument('-E', '--emailtype', help=argparse.SUPPRESS) p.add_argument('--components_file', help=argparse.SUPPRESS) p.add_argument('-U', '--url_type', help=argparse.SUPPRESS) p.add_argument('-K', '--keywords_type', help=argparse.SUPPRESS) p.add_argument('-W', '--status_whiteboard_type', help=argparse.SUPPRESS) p.add_argument('-B', '--booleantype', help=argparse.SUPPRESS) p.add_argument('--boolean_query', action="append", help=argparse.SUPPRESS) p.add_argument('--fixed_in_type', help=argparse.SUPPRESS) def _setup_action_info_parser(subparsers): description = ("List products or component information about the " "bugzilla server.") p = subparsers.add_parser("info", description=description) x = p.add_mutually_exclusive_group(required=True) x.add_argument('-p', '--products', action='store_true', help='Get a list of products') x.add_argument('-c', '--components', metavar="PRODUCT", help='List the components in the given product') x.add_argument('-o', '--component_owners', metavar="PRODUCT", help='List components (and their owners)') x.add_argument('-v', '--versions', metavar="PRODUCT", help='List the versions for the given product') p.add_argument('--active-components', action="store_true", help='Only show active components. Combine with --components*') def _setup_action_modify_parser(subparsers): usage = ("bugzilla modify [options] BUGID [BUGID...]\n" "Fields that take multiple values have a special input format.\n" "Append: [email protected]\n" "Overwrite: [email protected]\n" "Remove: [email protected]\n" "Options that accept this format: --cc, --blocked, --dependson,\n" " --groups, --tags, whiteboard fields.") p = subparsers.add_parser("modify", usage=usage) _parser_add_bz_fields(p, "modify") g = p.add_argument_group("'modify' specific options") g.add_argument("ids", nargs="+", help="Bug IDs to modify") g.add_argument('-k', '--close', metavar="RESOLUTION", help='Close with the given resolution (WONTFIX, NOTABUG, etc.)') g.add_argument('-d', '--dupeid', metavar="ORIGINAL", help='ID of original bug. Implies --close DUPLICATE') g.add_argument('--private', action='store_true', default=False, help='Mark new comment as private') g.add_argument('--reset-assignee', action="store_true", help='Reset assignee to component default') g.add_argument('--reset-qa-contact', action="store_true", help='Reset QA contact to component default') def _setup_action_attach_parser(subparsers): usage = """ bugzilla attach --file=FILE --desc=DESC [--type=TYPE] BUGID [BUGID...] bugzilla attach --get=ATTACHID --getall=BUGID [...] bugzilla attach --type=TYPE BUGID [BUGID...]""" description = "Attach files or download attachments." p = subparsers.add_parser("attach", description=description, usage=usage) p.add_argument("ids", nargs="*", help="BUGID references") p.add_argument('-f', '--file', metavar="FILENAME", help='File to attach, or filename for data provided on stdin') p.add_argument('-d', '--description', '--summary', metavar="SUMMARY", dest='desc', help="A short summary of the file being attached") p.add_argument('-t', '--type', metavar="MIMETYPE", help="Mime-type for the file being attached") p.add_argument('-g', '--get', metavar="ATTACHID", action="append", default=[], help="Download the attachment with the given ID") p.add_argument("--getall", "--get-all", metavar="BUGID", action="append", default=[], help="Download all attachments on the given bug") p.add_argument('-l', '--comment', '--long_desc', help="Add comment with attachment") def _setup_action_login_parser(subparsers): usage = 'bugzilla login [username [password]]' description = "Log into bugzilla and save a login cookie or token." p = subparsers.add_parser("login", description=description, usage=usage) p.add_argument("pos_username", nargs="?", help="Optional username", metavar="username") p.add_argument("pos_password", nargs="?", help="Optional password", metavar="password") def setup_parser(): rootparser = _setup_root_parser() subparsers = rootparser.add_subparsers(dest="command") subparsers.required = True _setup_action_new_parser(subparsers) _setup_action_query_parser(subparsers) _setup_action_info_parser(subparsers) _setup_action_modify_parser(subparsers) _setup_action_attach_parser(subparsers) _setup_action_login_parser(subparsers) return rootparser #################### # Command routines # #################### def _merge_field_opts(query, opt, parser): # Add any custom fields if specified if opt.fields is None: return for f in opt.fields: try: f, v = f.split('=', 1) query[f] = v except Exception: parser.error("Invalid field argument provided: %s" % (f)) def _do_query(bz, opt, parser): q = {} # Parse preconstructed queries. u = opt.from_url if u: q = bz.url_to_query(u) if opt.components_file: # Components slurped in from file (one component per line) # This can be made more robust clist = [] f = open(opt.components_file, 'r') for line in f.readlines(): line = line.rstrip("\n") clist.append(line) opt.component = clist if opt.status: val = opt.status stat = val if val == 'ALL': # leaving this out should return bugs of any status stat = None elif val == 'DEV': # Alias for all development bug statuses stat = ['NEW', 'ASSIGNED', 'NEEDINFO', 'ON_DEV', 'MODIFIED', 'POST', 'REOPENED'] elif val == 'QE': # Alias for all QE relevant bug statuses stat = ['ASSIGNED', 'ON_QA', 'FAILS_QA', 'PASSES_QA'] elif val == 'EOL': # Alias for EndOfLife bug statuses stat = ['VERIFIED', 'RELEASE_PENDING', 'CLOSED'] elif val == 'OPEN': # non-Closed statuses stat = ['NEW', 'ASSIGNED', 'MODIFIED', 'ON_DEV', 'ON_QA', 'VERIFIED', 'RELEASE_PENDING', 'POST'] opt.status = stat # Convert all comma separated list parameters to actual lists, # which is what bugzilla wants # According to bugzilla docs, any parameter can be a list, but # let's only do this for options we explicitly mention can be # comma separated. for optname in ["severity", "id", "status", "component", "priority", "product", "version"]: val = getattr(opt, optname, None) if not isinstance(val, str): continue setattr(opt, optname, val.split(",")) include_fields = None if opt.output == 'raw': # 'raw' always does a getbug() call anyways, so just ask for ID back include_fields = ['id'] elif opt.outputformat: include_fields = [] for fieldname, rest in format_field_re.findall(opt.outputformat): if fieldname == "whiteboard" and rest: fieldname = rest + "_" + fieldname elif fieldname == "flag": fieldname = "flags" elif fieldname == "cve": fieldname = ["keywords", "blocks"] elif fieldname == "__unicode__": # Needs to be in sync with bug.__unicode__ fieldname = ["id", "status", "assigned_to", "summary"] flist = isinstance(fieldname, list) and fieldname or [fieldname] for f in flist: if f not in include_fields: include_fields.append(f) if include_fields is not None: include_fields.sort() built_query = bz.build_query( product=opt.product or None, component=opt.component or None, sub_component=opt.sub_component or None, version=opt.version or None, reporter=opt.reporter or None, bug_id=opt.id or None, short_desc=opt.summary or None, long_desc=opt.comment or None, cc=opt.cc or None, assigned_to=opt.assigned_to or None, qa_contact=opt.qa_contact or None, status=opt.status or None, blocked=opt.blocked or None, dependson=opt.dependson or None, keywords=opt.keywords or None, keywords_type=opt.keywords_type or None, url=opt.url or None, url_type=opt.url_type or None, status_whiteboard=opt.whiteboard or None, status_whiteboard_type=opt.status_whiteboard_type or None, fixed_in=opt.fixed_in or None, fixed_in_type=opt.fixed_in_type or None, flag=opt.flag or None, alias=opt.alias or None, qa_whiteboard=opt.qa_whiteboard or None, devel_whiteboard=opt.devel_whiteboard or None, boolean_query=opt.boolean_query or None, bug_severity=opt.severity or None, priority=opt.priority or None, target_release=opt.target_release or None, target_milestone=opt.target_milestone or None, emailtype=opt.emailtype or None, booleantype=opt.booleantype or None, include_fields=include_fields, quicksearch=opt.quicksearch or None, savedsearch=opt.savedsearch or None, savedsearch_sharer_id=opt.savedsearch_sharer_id or None, tags=opt.tags or None) _merge_field_opts(built_query, opt, parser) built_query.update(q) q = built_query if not q: parser.error("'query' command requires additional arguments") if opt.test_return_result: return q return bz.query(q) def _do_info(bz, opt): """ Handle the 'info' subcommand """ # All these commands call getproducts internally, so do it up front # with minimal include_fields for speed def _filter_components(compdetails): ret = {} for k, v in compdetails.items(): if v.get("is_active", True): ret[k] = v return ret productname = (opt.components or opt.component_owners or opt.versions) include_fields = ["name", "id"] fastcomponents = (opt.components and not opt.active_components) if opt.versions: include_fields += ["versions"] if opt.component_owners: include_fields += [ "components.default_assigned_to", "components.name", ] if (opt.active_components and any(["components" in i for i in include_fields])): include_fields += ["components.is_active"] bz.refresh_products(names=productname and [productname] or None, include_fields=include_fields) if opt.products: for name in sorted([p["name"] for p in bz.getproducts()]): print(name) elif fastcomponents: for name in sorted(bz.getcomponents(productname)): print(name) elif opt.components: details = bz.getcomponentsdetails(productname) for name in sorted(_filter_components(details)): print(name) elif opt.versions: proddict = bz.getproducts()[0] for v in proddict['versions']: print(to_encoding(v["name"])) elif opt.component_owners: details = bz.getcomponentsdetails(productname) for c in sorted(_filter_components(details)): print(to_encoding(u"%s: %s" % (c, details[c]['default_assigned_to']))) def _convert_to_outputformat(output): fmt = "" if output == "normal": fmt = "%{__unicode__}" elif output == "ids": fmt = "%{id}" elif output == 'full': fmt += "%{__unicode__}\n" fmt += "Component: %{component}\n" fmt += "CC: %{cc}\n" fmt += "Blocked: %{blocks}\n" fmt += "Depends: %{depends_on}\n" fmt += "%{comments}\n" elif output == 'extra': fmt += "%{__unicode__}\n" fmt += " +Keywords: %{keywords}\n" fmt += " +QA Whiteboard: %{qa_whiteboard}\n" fmt += " +Status Whiteboard: %{status_whiteboard}\n" fmt += " +Devel Whiteboard: %{devel_whiteboard}\n" elif output == 'oneline': fmt += "#%{bug_id} %{status} %{assigned_to} %{component}\t" fmt += "[%{target_milestone}] %{flags} %{cve}" else: raise RuntimeError("Unknown output type '%s'" % output) return fmt def _format_output(bz, opt, buglist): if opt.output == 'raw': buglist = bz.getbugs([b.bug_id for b in buglist]) for b in buglist: print("Bugzilla %s: " % b.bug_id) for attrname in sorted(b.__dict__): print(to_encoding(u"ATTRIBUTE[%s]: %s" % (attrname, b.__dict__[attrname]))) print("\n\n") return def bug_field(matchobj): # whiteboard and flag allow doing # %{whiteboard:devel} and %{flag:needinfo} # That's what 'rest' matches (fieldname, rest) = matchobj.groups() if fieldname == "whiteboard" and rest: fieldname = rest + "_" + fieldname if fieldname == "flag" and rest: val = b.get_flag_status(rest) elif fieldname == "flags" or fieldname == "flags_requestee": tmpstr = [] for f in getattr(b, "flags", []): requestee = f.get('requestee', "") if fieldname == "flags": requestee = "" if fieldname == "flags_requestee": if requestee == "": continue tmpstr.append("%s" % requestee) else: tmpstr.append("%s%s%s" % (f['name'], f['status'], requestee)) val = ",".join(tmpstr) elif fieldname == "cve": cves = [] for key in getattr(b, "keywords", []): # grab CVE from keywords and blockers if key.find("Security") == -1: continue for bl in b.blocks: cvebug = bz.getbug(bl) for cb in cvebug.alias: if cb.find("CVE") == -1: continue if cb.strip() not in cves: cves.append(cb) val = ",".join(cves) elif fieldname == "comments": val = "" for c in getattr(b, "comments", []): val += ("\n* %s - %s:\n%s\n" % (c['time'], c.get("creator", c.get("author", "")), c['text'])) elif fieldname == "external_bugs": val = "" for e in getattr(b, "external_bugs", []): url = e["type"]["full_url"].replace("%id%", e["ext_bz_bug_id"]) if not val: val += "\n" val += "External bug: %s\n" % url elif fieldname == "__unicode__": val = b.__unicode__() else: val = getattr(b, fieldname, "") vallist = isinstance(val, list) and val or [val] val = ','.join([to_encoding(v) for v in vallist]) return val for b in buglist: print(format_field_re.sub(bug_field, opt.outputformat)) def _parse_triset(vallist, checkplus=True, checkminus=True, checkequal=True, splitcomma=False): add_val = [] rm_val = [] set_val = None def make_list(v): if not v: return [] if splitcomma: return v.split(",") return [v] for val in isinstance(vallist, list) and vallist or [vallist]: val = val or "" if val.startswith("+") and checkplus: add_val += make_list(val[1:]) elif val.startswith("-") and checkminus: rm_val += make_list(val[1:]) elif val.startswith("=") and checkequal: # Intentionally overwrite this set_val = make_list(val[1:]) else: add_val += make_list(val) return add_val, rm_val, set_val def _do_new(bz, opt, parser): # Parse options that accept comma separated list def parse_multi(val): return _parse_triset(val, checkplus=False, checkminus=False, checkequal=False, splitcomma=True)[0] ret = bz.build_createbug( blocks=parse_multi(opt.blocked) or None, cc=parse_multi(opt.cc) or None, component=opt.component or None, depends_on=parse_multi(opt.dependson) or None, description=opt.comment or None, groups=parse_multi(opt.groups) or None, keywords=parse_multi(opt.keywords) or None, op_sys=opt.os or None, platform=opt.arch or None, priority=opt.priority or None, product=opt.product or None, severity=opt.severity or None, summary=opt.summary or None, url=opt.url or None, version=opt.version or None, assigned_to=opt.assigned_to or None, qa_contact=opt.qa_contact or None, sub_component=opt.sub_component or None, alias=opt.alias or None, comment_tags=opt.comment_tag or None, ) _merge_field_opts(ret, opt, parser) if opt.test_return_result: return ret b = bz.createbug(ret) b.refresh() return [b] def _do_modify(bz, parser, opt): bugid_list = [bugid for a in opt.ids for bugid in a.split(',')] add_wb, rm_wb, set_wb = _parse_triset(opt.whiteboard) add_devwb, rm_devwb, set_devwb = _parse_triset(opt.devel_whiteboard) add_intwb, rm_intwb, set_intwb = _parse_triset(opt.internal_whiteboard) add_qawb, rm_qawb, set_qawb = _parse_triset(opt.qa_whiteboard) add_blk, rm_blk, set_blk = _parse_triset(opt.blocked, splitcomma=True) add_deps, rm_deps, set_deps = _parse_triset(opt.dependson, splitcomma=True) add_key, rm_key, set_key = _parse_triset(opt.keywords) add_cc, rm_cc, ignore = _parse_triset(opt.cc, checkplus=False, checkequal=False) add_groups, rm_groups, ignore = _parse_triset(opt.groups, checkequal=False, splitcomma=True) add_tags, rm_tags, ignore = _parse_triset(opt.tags, checkequal=False) status = opt.status or None if opt.dupeid is not None: opt.close = "DUPLICATE" if opt.close: status = "CLOSED" flags = [] if opt.flag: # Convert "foo+" to tuple ("foo", "+") for f in opt.flag: flags.append({"name": f[:-1], "status": f[-1]}) update = bz.build_update( assigned_to=opt.assigned_to or None, comment=opt.comment or None, comment_private=opt.private or None, component=opt.component or None, product=opt.product or None, blocks_add=add_blk or None, blocks_remove=rm_blk or None, blocks_set=set_blk, url=opt.url or None, cc_add=add_cc or None, cc_remove=rm_cc or None, depends_on_add=add_deps or None, depends_on_remove=rm_deps or None, depends_on_set=set_deps, groups_add=add_groups or None, groups_remove=rm_groups or None, keywords_add=add_key or None, keywords_remove=rm_key or None, keywords_set=set_key, op_sys=opt.os or None, platform=opt.arch or None, priority=opt.priority or None, qa_contact=opt.qa_contact or None, severity=opt.severity or None, status=status, summary=opt.summary or None, version=opt.version or None, reset_assigned_to=opt.reset_assignee or None, reset_qa_contact=opt.reset_qa_contact or None, resolution=opt.close or None, target_release=opt.target_release or None, target_milestone=opt.target_milestone or None, dupe_of=opt.dupeid or None, fixed_in=opt.fixed_in or None, whiteboard=set_wb and set_wb[0] or None, devel_whiteboard=set_devwb and set_devwb[0] or None, internal_whiteboard=set_intwb and set_intwb[0] or None, qa_whiteboard=set_qawb and set_qawb[0] or None, sub_component=opt.sub_component or None, alias=opt.alias or None, flags=flags or None, comment_tags=opt.comment_tag or None, ) # We make this a little convoluted to facilitate unit testing wbmap = { "whiteboard": (add_wb, rm_wb), "internal_whiteboard": (add_intwb, rm_intwb), "qa_whiteboard": (add_qawb, rm_qawb), "devel_whiteboard": (add_devwb, rm_devwb), } for k, v in wbmap.copy().items(): if not v[0] and not v[1]: del(wbmap[k]) _merge_field_opts(update, opt, parser) log.debug("update bug dict=%s", update) log.debug("update whiteboard dict=%s", wbmap) if not any([update, wbmap, add_tags, rm_tags]): parser.error("'modify' command requires additional arguments") if opt.test_return_result: return (update, wbmap, add_tags, rm_tags) if add_tags or rm_tags: ret = bz.update_tags(bugid_list, tags_add=add_tags, tags_remove=rm_tags) log.debug("bz.update_tags returned=%s", ret) if update: ret = bz.update_bugs(bugid_list, update) log.debug("bz.update_bugs returned=%s", ret) if not wbmap: return # Now for the things we can't blindly batch. # Being able to prepend/append to whiteboards, which are just # plain string values, is an old rhbz semantic that we try to maintain # here. This is a bit weird for traditional bugzilla XMLRPC log.debug("Adjusting whiteboard fields one by one") for bug in bz.getbugs(bugid_list): for wb, (add_list, rm_list) in wbmap.items(): for tag in add_list: newval = getattr(bug, wb) or "" if newval: newval += " " newval += tag bz.update_bugs([bug.id], bz.build_update(**{wb: newval})) for tag in rm_list: newval = (getattr(bug, wb) or "").split() for t in newval[:]: if t == tag: newval.remove(t) bz.update_bugs([bug.id], bz.build_update(**{wb: " ".join(newval)})) def _do_get_attach(bz, opt): for bug in bz.getbugs(opt.getall): opt.get += bug.get_attachment_ids() for attid in set(opt.get): att = bz.openattachment(attid) outfile = open_without_clobber(att.name, "wb") data = att.read(4096) while data: outfile.write(data) data = att.read(4096) print("Wrote %s" % outfile.name) return def _do_set_attach(bz, opt, parser): if not opt.ids: parser.error("Bug ID must be specified for setting attachments") if sys.stdin.isatty(): if not opt.file: parser.error("--file must be specified") fileobj = open(opt.file, "rb") else: # piped input on stdin if not opt.desc: parser.error("--description must be specified if passing " "file on stdin") fileobj = tempfile.NamedTemporaryFile(prefix="bugzilla-attach.") data = sys.stdin.read(4096) while data: fileobj.write(data.encode(locale.getpreferredencoding())) data = sys.stdin.read(4096) fileobj.seek(0) kwargs = {} if opt.file: kwargs["filename"] = os.path.basename(opt.file) if opt.type: kwargs["contenttype"] = opt.type if opt.type in ["text/x-patch"]: kwargs["ispatch"] = True if opt.comment: kwargs["comment"] = opt.comment desc = opt.desc or os.path.basename(fileobj.name) # Upload attachments for bugid in opt.ids: attid = bz.attachfile(bugid, fileobj, desc, **kwargs) print("Created attachment %i on bug %s" % (attid, bugid)) ################# # Main handling # ################# def _make_bz_instance(opt): """ Build the Bugzilla instance we will use """ if opt.bztype != 'auto': log.info("Explicit --bztype is no longer supported, ignoring") cookiefile = None tokenfile = None if opt.cache_credentials: cookiefile = opt.cookiefile or -1 tokenfile = opt.tokenfile or -1 bz = bugzilla.Bugzilla( url=opt.bugzilla, cookiefile=cookiefile, tokenfile=tokenfile, sslverify=opt.sslverify, cert=opt.cert) return bz def _handle_login(opt, action, bz): """ Handle all login related bits """ is_login_command = (action == 'login') do_interactive_login = (is_login_command or opt.login or opt.username or opt.password) username = getattr(opt, "pos_username", None) or opt.username password = getattr(opt, "pos_password", None) or opt.password try: if do_interactive_login: if bz.url: print("Logging into %s" % urlparse(bz.url)[1]) bz.interactive_login(username, password) except bugzilla.BugzillaError as e: print(str(e)) sys.exit(1) if opt.ensure_logged_in and not bz.logged_in: print("--ensure-logged-in passed but you aren't logged in to %s" % bz.url) sys.exit(1) if is_login_command: msg = "Login successful." if bz.cookiefile or bz.tokenfile: msg = "Login successful, token cache updated." print(msg) sys.exit(0) def _main(unittest_bz_instance): parser = setup_parser() opt = parser.parse_args() action = opt.command setup_logging(opt.debug, opt.verbose) log.debug("Launched with command line: %s", " ".join(sys.argv)) log.debug("Bugzilla module: %s", bugzilla) # Connect to bugzilla log.info('Connecting to %s', opt.bugzilla) if unittest_bz_instance: bz = unittest_bz_instance else: bz = _make_bz_instance(opt) # Handle login options _handle_login(opt, action, bz) ########################### # Run the actual commands # ########################### if hasattr(opt, "outputformat"): if not opt.outputformat and opt.output not in ['raw', None]: opt.outputformat = _convert_to_outputformat(opt.output) buglist = [] if action == 'info': if not (opt.products or opt.components or opt.component_owners or opt.versions): parser.error("'info' command requires additional arguments") _do_info(bz, opt) elif action == 'query': buglist = _do_query(bz, opt, parser) if opt.test_return_result: return buglist elif action == 'new': buglist = _do_new(bz, opt, parser) if opt.test_return_result: return buglist elif action == 'attach': if opt.get or opt.getall: if opt.ids: parser.error("Bug IDs '%s' not used for " "getting attachments" % opt.ids) _do_get_attach(bz, opt) else: _do_set_attach(bz, opt, parser) elif action == 'modify': modout = _do_modify(bz, parser, opt) if opt.test_return_result: return modout else: raise RuntimeError("Unexpected action '%s'" % action) # If we're doing new/query/modify, output our results if action in ['new', 'query']: _format_output(bz, opt, buglist) def main(unittest_bz_instance=None): try: try: return _main(unittest_bz_instance) except (Exception, KeyboardInterrupt): log.debug("", exc_info=True) raise except (Fault, bugzilla.BugzillaError) as e: print("\nServer error: %s" % str(e)) sys.exit(3) except requests.exceptions.SSLError as e: # Give SSL recommendations print("SSL error: %s" % e) print("\nIf you trust the remote server, you can work " "around this error with:\n" " bugzilla --nosslverify ...") sys.exit(4) except (socket.error, requests.exceptions.HTTPError, requests.exceptions.ConnectionError, ProtocolError) as e: print("\nConnection lost/failed: %s" % str(e)) sys.exit(2) def cli(): try: main() except KeyboardInterrupt: log.debug("", exc_info=True) print("\nExited at user request.") sys.exit(1)