root/masterdriverz/snakeoil/snakeoil/formatters.py @ masterdriverz%2540dev-20070818210028-hbpwain0txgykdkz

Revision masterdriverz%2540dev-20070818210028-hbpwain0txgykdkz, 18.5 kB (checked in by Charlie <masterdriverz@…>, 17 months ago)

Set native_PlainTextFormatter.name to 'PlainTextFormatter?' to fix pkgcore tests when snakeoil extensions aren't built

Line 
1# Copyright: 2006 Marien Zwart <marienz@gentoo.org>
2# License: GPL2
3
4"""Classes wrapping a file-like object to do fancy output on it."""
5
6import os
7import errno
8
9from snakeoil.klass import GetAttrProxy
10from snakeoil.demandload import demandload
11demandload(globals(), 'locale')
12
13
14class native_StreamClosed(KeyboardInterrupt):
15    """Raised by L{Formatter.write} if the stream it prints to was closed.
16
17    This inherits from C{KeyboardInterrupt} because it should usually
18    be handled the same way: a common way of triggering this exception
19    is by closing a pager before the script finished outputting, which
20    should be handled like control+c, not like an error.
21    """
22
23
24# "Invalid name" (for fg and bg methods, too short)
25# pylint: disable-msg=C0103
26
27
28class Formatter(object):
29
30    """Abstract formatter base class.
31
32    The types of most of the instance attributes is undefined (depends
33    on the implementation of the particular Formatter subclass).
34
35    @ivar bold: object to pass to L{write} to switch to bold mode.
36    @ivar underline: object to pass to L{write} to switch to underlined mode.
37    @ivar reset: object to pass to L{write} to turn off bold and underline.
38    @ivar wrap: boolean indicating we auto-linewrap (defaults to off).
39    @ivar autoline: boolean indicating we are in auto-newline mode
40        (defaults to on).
41    """
42
43    def __init__(self):
44        self.autoline = True
45        self.wrap = False
46
47    def write(self, *args, **kwargs):
48        """Write something to the stream.
49
50        Acceptable arguments are:
51          - Strings are simply written to the stream.
52          - C{None} is ignored.
53          - Functions are called with the formatter as argument.
54            Their return value is then used the same way as the other
55            arguments.
56          - Formatter subclasses might special-case certain objects.
57
58        Accepts wrap and autoline as keyword arguments. Effect is
59        the same as setting them before the write call and resetting
60        them afterwards.
61
62        Accepts first_prefixes and later_prefixes as keyword
63        arguments. They should be sequences that are temporarily
64        appended to the first_prefix and later_prefix attributes.
65
66        Accepts prefixes as a keyword argument. Effect is the same as
67        setting first_prefixes and later_prefixes to the same value.
68
69        Accepts first_prefix, later_prefix and prefix as keyword
70        argument. Effect is the same as setting first_prefixes,
71        later_prefixes or prefixes to a one-element tuple.
72
73        The formatter has a couple of attributes that are useful as argument
74        to write.
75        """
76
77    def fg(self, color=None):
78        """Change foreground color.
79
80        @type  color: a string or C{None}.
81        @param color: color to change to. A default is used if omitted.
82                      C{None} resets to the default color.
83        """
84
85    def bg(self, color=None):
86        """Change background color.
87
88        @type  color: a string or C{None}.
89        @param color: color to change to. A default is used if omitted.
90                      C{None} resets to the default color.
91        """
92
93    def error(self, message):
94        """Format a string as an error message."""
95        self.write(message, prefixes=(
96                self.fg('red'), self.bold, '!!! ', self.reset))
97
98    def warn(self, message):
99        """Format a string as a warning message."""
100        self.write(message, prefixes=(
101                self.fg('yellow'), self.bold, '*** ', self.reset))
102
103    def title(self, string):
104        """Set the title to string"""
105        pass
106
107
108class native_PlainTextFormatter(Formatter):
109
110    """Formatter writing plain text to a file-like object.
111
112    @ivar width: contains the current maximum line length.
113    @ivar encoding: the encoding unicode strings should be converted to.
114    @ivar first_prefix: prefixes to output at the beginning of every write.
115    @ivar later_prefix: prefixes to output on each line after the first of
116                        every write.
117    """
118
119    bold = underline = reset = ''
120
121    def __init__(self, stream, width=79, encoding=None):
122        """Initialize.
123
124        @type  stream: file-like object.
125        @param stream: stream to output to.
126        @param width: maximum line width (defaults to 79).
127        @param encoding: encoding unicode strings are converted to.
128        """
129        Formatter.__init__(self)
130        self.stream = stream
131        if encoding is None:
132            encoding = getattr(self.stream, 'encoding', None)
133        if encoding is None:
134            try:
135                encoding = locale.getpreferredencoding()
136            except locale.Error:
137                encoding = 'ascii'
138        self.encoding = encoding
139        self.width = width
140        self._pos = 0
141        self._in_first_line = True
142        self._wrote_something = False
143        self.first_prefix = []
144        self.later_prefix = []
145
146
147    def _write_prefix(self, wrap):
148        if self._in_first_line:
149            prefix = self.first_prefix
150        else:
151            prefix = self.later_prefix
152        # This is a bit braindead since it duplicates a lot of code
153        # from write. Avoids fun things like word wrapped prefix though.
154
155        for thing in prefix:
156            while callable(thing):
157                thing = thing(self)
158            if thing is None:
159                continue
160            if not isinstance(thing, basestring):
161                thing = str(thing)
162            self._pos += len(thing)
163            if isinstance(thing, unicode):
164                thing = thing.encode(self.encoding, 'replace')
165            self.stream.write(thing)
166        if wrap and self._pos >= self.width:
167            # XXX What to do? Our prefix does not fit.
168            # This makes sure we still output something,
169            # but it is completely arbitrary.
170            self._pos = self.width - 10
171
172
173    def write(self, *args, **kwargs):
174        wrap = kwargs.get('wrap', self.wrap)
175        autoline = kwargs.get('autoline', self.autoline)
176        prefixes = kwargs.get('prefixes')
177        first_prefixes = kwargs.get('first_prefixes')
178        later_prefixes = kwargs.get('later_prefixes')
179        if prefixes is not None:
180            if first_prefixes is not None or later_prefixes is not None:
181                raise TypeError(
182                    'do not pass first_prefixes or later_prefixes '
183                    'if prefixes is passed')
184            first_prefixes = later_prefixes = prefixes
185        prefix = kwargs.get('prefix')
186        first_prefix = kwargs.get('first_prefix')
187        later_prefix = kwargs.get('later_prefix')
188        if prefix is not None:
189            if first_prefix is not None or later_prefix is not None:
190                raise TypeError(
191                    'do not pass first_prefix or later_prefix with prefix')
192            first_prefix = later_prefix = prefix
193        if first_prefix is not None:
194            if first_prefixes is not None:
195                raise TypeError(
196                    'do not pass both first_prefix and first_prefixes')
197            first_prefixes = (first_prefix,)
198        if later_prefix is not None:
199            if later_prefixes is not None:
200                raise TypeError(
201                    'do not pass both later_prefix and later_prefixes')
202            later_prefixes = (later_prefix,)
203        if first_prefixes is not None:
204            self.first_prefix.extend(first_prefixes)
205        if later_prefixes is not None:
206            self.later_prefix.extend(later_prefixes)
207        # Remove this nested try block once we depend on python 2.5
208        try:
209            try:
210                for arg in args:
211                    # If we're at the start of the line, write our prefix.
212                    # There is a deficiency here: if neither our arg nor our
213                    # prefix affect _pos (both are escape sequences or empty)
214                    # we will write prefix more than once. This should not
215                    # matter.
216                    if not self._pos:
217                        self._write_prefix(wrap)
218                    while callable(arg):
219                        arg = arg(self)
220                    if arg is None:
221                        continue
222                    if not isinstance(arg, basestring):
223                        arg = str(arg)
224                    is_unicode = isinstance(arg, unicode)
225                    while wrap and self._pos + len(arg) > self.width:
226                        # We have to split.
227                        maxlen = self.width - self._pos
228                        space = arg.rfind(' ', 0, maxlen)
229                        if space == -1:
230                            # No space to split on.
231
232                            # If we are on the first line we can simply go to
233                            # the next (this helps if the "later" prefix is
234                            # shorter and should not really matter if not).
235
236                            # If we are on the second line and have already
237                            # written something we can also go to the next
238                            # line.
239                            if self._in_first_line or self._wrote_something:
240                                bit = ''
241                            else:
242                                # Forcibly split this as far to the right as
243                                # possible.
244                                bit = arg[:maxlen]
245                                arg = arg[maxlen:]
246                        else:
247                            bit = arg[:space]
248                            # Omit the space we split on.
249                            arg = arg[space+1:]
250                        if is_unicode:
251                            bit = bit.encode(self.encoding, 'replace')
252                        self.stream.write(bit)
253                        self.stream.write('\n')
254                        self._pos = 0
255                        self._in_first_line = False
256                        self._wrote_something = False
257                        self._write_prefix(wrap)
258
259                    # This fits.
260                    self._wrote_something = True
261                    self._pos += len(arg)
262                    if is_unicode:
263                        arg = arg.encode(self.encoding, 'replace')
264                    self.stream.write(arg)
265                if autoline:
266                    self.stream.write('\n')
267                    self._wrote_something = False
268                    self._pos = 0
269                    self._in_first_line = True
270            except IOError, e:
271                if e.errno == errno.EPIPE:
272                    raise StreamClosed(e)
273                raise
274        finally:
275            if first_prefixes is not None:
276                self.first_prefix = self.first_prefix[:-len(first_prefixes)]
277            if later_prefixes is not None:
278                self.later_prefix = self.later_prefix[:-len(later_prefixes)]
279
280    def fg(self, color=None):
281        return ''
282
283    def bg(self, color=None):
284        return ''
285
286try:
287    from snakeoil._formatters import PlainTextFormatter, StreamClosed
288    class PlainTextFormatter(PlainTextFormatter, Formatter):
289        __doc__ = native_PlainTextFormatter.__doc__
290        __slots__ = ()
291        def fg(self, color=None):
292            return ''
293        bg = fg
294
295except ImportError:
296    PlainTextFormatter = native_PlainTextFormatter
297    PlainTextFormatter.__name__ = 'PlainTextFormatter'
298    StreamClosed = native_StreamClosed
299
300# This is necessary because the curses module is optional (and we
301# should run on a very minimal python for bootstrapping).
302try:
303    import curses
304except ImportError:
305    TerminfoColor = None
306else:
307    class TerminfoColor(object):
308
309        def __init__(self, mode, color):
310            self.mode = mode
311            self.color = color
312
313        def __call__(self, formatter):
314            if self.color is None:
315                formatter._current_colors[self.mode] = None
316                res = formatter._color_reset
317                # slight abuse of boolean True/False and 1/0 equivalence
318                other = formatter._current_colors[not self.mode]
319                if other is not None:
320                    res = res + other
321            else:
322                if self.mode == 0:
323                    default = curses.COLOR_WHITE
324                else:
325                    default = curses.COLOR_BLACK
326                color = formatter._colors.get(self.color, default)
327                # The curses module currently segfaults if handed a
328                # bogus template so check explicitly.
329                template = formatter._set_color[self.mode]
330                if template:
331                    res = curses.tparm(template, color)
332                else:
333                    res = ''
334                formatter._current_colors[self.mode] = res
335            formatter.stream.write(res)
336
337
338    class TerminfoCode(object):
339        def __init__(self, value):
340            self.value = value
341
342    class TerminfoMode(TerminfoCode):
343        def __call__(self, formatter):
344            formatter._modes.add(self)
345            formatter.stream.write(self.value)
346
347    class TerminfoReset(TerminfoCode):
348        def __call__(self, formatter):
349            formatter._modes.clear()
350            formatter.stream.write(self.value)
351
352
353    class TerminfoFormatter(PlainTextFormatter):
354
355        """Formatter writing to a tty, using terminfo to do colors."""
356
357        _colors = dict(
358            black = curses.COLOR_BLACK,
359            red = curses.COLOR_RED,
360            green = curses.COLOR_GREEN,
361            yellow = curses.COLOR_YELLOW,
362            blue = curses.COLOR_BLUE,
363            magenta = curses.COLOR_MAGENTA,
364            cyan = curses.COLOR_CYAN,
365            white = curses.COLOR_WHITE)
366
367        # Remapping of TERM setting to more capable equivalent.
368        # Mainly used to force on the hardstatus (aka title bar updates)
369        # capability for terminals that do not support this by default.
370        term_alternates = {
371            'xterm': 'xterm+sl',
372            'screen': 'screen-s',
373            }
374
375        def __init__(self, stream, term=None, forcetty=False, encoding=None):
376            """Initialize.
377
378            @type  stream: file-like object.
379            @param stream: stream to output to, defaulting to C{sys.stdout}.
380            @type  term: string.
381            @param term: terminal type, pulled from the environment if omitted.
382            @type  forcetty: bool
383            @param forcetty: force output of colors even if the wrapped stream
384                             is not a tty.
385            """
386            PlainTextFormatter.__init__(self, stream, encoding=encoding)
387            fd = stream.fileno()
388            if term is None:
389                # We only apply the remapping if we are guessing the
390                # terminal type from the environment. If we get a term
391                # type passed explicitly we just use it as-is (if the
392                # caller wants the remap just doing the
393                # term_alternates lookup there is easy enough.)
394                term_env = os.environ.get('TERM')
395                term_alt = self.term_alternates.get(term_env)
396                for term in (term_alt, term_env, 'dumb'):
397                    if term is not None:
398                        try:
399                            curses.setupterm(fd=fd, term=term)
400                        except curses.error:
401                            pass
402                        else:
403                            break
404                else:
405                    raise ValueError(
406                        'no terminfo entries, not even for "dumb"?')
407            else:
408                # TODO maybe do something more useful than raising curses.error
409                # if term is not in the terminfo db here?
410                curses.setupterm(fd=fd, term=term)
411            self.width = curses.tigetnum('cols')
412            self.reset = TerminfoReset(curses.tigetstr('sgr0'))
413            self.bold = TerminfoMode(curses.tigetstr('bold'))
414            self.underline = TerminfoMode(curses.tigetstr('smul'))
415            self._color_reset = curses.tigetstr('op')
416            self._set_color = (
417                curses.tigetstr('setaf'), curses.tigetstr('setab'))
418            # [fg, bg]
419            self._current_colors = [None, None]
420            self._modes = set()
421            self._pos = 0
422
423        def fg(self, color=None):
424            return TerminfoColor(0, color)
425
426        def bg(self, color=None):
427            return TerminfoColor(1, color)
428
429        def write(self, *args, **kwargs):
430            PlainTextFormatter.write(self, *args, **kwargs)
431            try:
432                if self._modes:
433                    self.reset(self)
434                if self._current_colors != [None, None]:
435                    self._current_colors = [None, None]
436                    self.stream.write(self._color_reset)
437            except IOError, e:
438                if e.errno == errno.EPIPE:
439                    raise StreamClosed(e)
440                raise
441
442        def title(self, string):
443            # I want to use curses.tigetflag('hs') here but at least
444            # the screen-s entry defines a tsl and fsl string but does
445            # not set the hs flag. So just check for the ability to
446            # jump to and out of the status line, without checking if
447            # the status line we're using exists.
448            if curses.tigetstr('tsl') and curses.tigetstr('fsl'):
449                self.stream.write(
450                    curses.tigetstr('tsl') + string + curses.tigetstr('fsl'))
451                self.stream.flush()
452
453
454class ObserverFormatter(object):
455
456    def __init__(self, real_formatter):
457        self._formatter = real_formatter
458
459    def write(self, *args):
460        self._formatter.write(autoline=False, *args)
461
462    __getattr__ = GetAttrProxy("_formatter")
463
464
465def get_formatter(stream):
466    """TerminfoFormatter if the stream is a tty, else PlainTextFormatter."""
467    if TerminfoColor is None:
468        return PlainTextFormatter(stream)
469    try:
470        fd = stream.fileno()
471    except AttributeError:
472        pass
473    else:
474        # We do this instead of stream.isatty() because TerminfoFormatter
475        # needs an fd to pass to curses, not just a filelike talking to a tty.
476        if os.isatty(fd):
477            try:
478                return TerminfoFormatter(stream)
479            except curses.error:
480                # This happens if TERM is unset and possibly in more cases.
481                # Just fall back to the PlainTextFormatter.
482                pass
483    return PlainTextFormatter(stream)
484
485
486def decorate_forced_wrapping(setting=True):
487    def wrapped_func(func):
488        def f(out, *args, **kwds):
489            oldwrap = out.wrap
490            out.wrap = setting
491            try:
492                return func(out, *args, **kwds)
493            finally:
494                out.wrap = oldwrap
495        return f
496    return wrapped_func
Note: See TracBrowser for help on using the browser.