root/masterdriverz/snakeoil-formatters/snakeoil/formatters.py @ masterdriverz%2540gentoo.org-20070520172117-kd4ryy9mcqx8y2pn

Revision masterdriverz%2540gentoo.org-20070520172117-kd4ryy9mcqx8y2pn, 17.9 kB (checked in by Charlie Shepherd <masterdriverz@…>, 20 months ago)

Fix importing and actually add StreamClosed? to the namespace

Line 
1# Copyright: 2006 Marien Zwart <marienz@gentoo.org>
2# License: GPL2
3
4"""Classes wrapping a file-like object to do fancy output on it."""
5
6import os
7import errno
8
9from snakeoil.klass import GetAttrProxy
10from snakeoil.demandload import demandload
11demandload(globals(), 'locale')
12
13
14class native_StreamClosed(KeyboardInterrupt):
15    """Raised by L{Formatter.write} if the stream it prints to was closed.
16
17    This inherits from C{KeyboardInterrupt} because it should usually
18    be handled the same way: a common way of triggering this exception
19    is by closing a pager before the script finished outputting, which
20    should be handled like control+c, not like an error.
21    """
22
23
24# "Invalid name" (for fg and bg methods, too short)
25# pylint: disable-msg=C0103
26
27
28class Formatter(object):
29
30    """Abstract formatter base class.
31
32    The types of most of the instance attributes is undefined (depends
33    on the implementation of the particular Formatter subclass).
34
35    @ivar bold: object to pass to L{write} to switch to bold mode.
36    @ivar underline: object to pass to L{write} to switch to underlined mode.
37    @ivar reset: object to pass to L{write} to turn off bold and underline.
38    @ivar wrap: boolean indicating we auto-linewrap (defaults to off).
39    @ivar autoline: boolean indicating we are in auto-newline mode
40        (defaults to on).
41    """
42
43    def __init__(self):
44        self.autoline = True
45        self.wrap = False
46
47    def write(self, *args, **kwargs):
48        """Write something to the stream.
49
50        Acceptable arguments are:
51          - Strings are simply written to the stream.
52          - C{None} is ignored.
53          - Functions are called with the formatter as argument.
54            Their return value is then used the same way as the other
55            arguments.
56          - Formatter subclasses might special-case certain objects.
57
58        Accepts wrap and autoline as keyword arguments. Effect is
59        the same as setting them before the write call and resetting
60        them afterwards.
61
62        Accepts first_prefixes and later_prefixes as keyword
63        arguments. They should be sequences that are temporarily
64        appended to the first_prefix and later_prefix attributes.
65
66        Accepts prefixes as a keyword argument. Effect is the same as
67        setting first_prefixes and later_prefixes to the same value.
68
69        Accepts first_prefix, later_prefix and prefix as keyword
70        argument. Effect is the same as setting first_prefixes,
71        later_prefixes or prefixes to a one-element tuple.
72
73        The formatter has a couple of attributes that are useful as argument
74        to write.
75        """
76
77    def fg(self, color=None):
78        """Change foreground color.
79
80        @type  color: a string or C{None}.
81        @param color: color to change to. A default is used if omitted.
82                      C{None} resets to the default color.
83        """
84
85    def bg(self, color=None):
86        """Change background color.
87
88        @type  color: a string or C{None}.
89        @param color: color to change to. A default is used if omitted.
90                      C{None} resets to the default color.
91        """
92
93    def error(self, message):
94        """Format a string as an error message."""
95        self.write(message, prefixes=(
96                self.fg('red'), self.bold, '!!! ', self.reset))
97
98    def warn(self, message):
99        """Format a string as a warning message."""
100        self.write(message, prefixes=(
101                self.fg('yellow'), self.bold, '*** ', self.reset))
102
103    def title(self, string):
104        pass
105
106class native_PlainTextFormatter(Formatter):
107
108    """Formatter writing plain text to a file-like object.
109
110    @ivar width: contains the current maximum line length.
111    """
112
113    bold = underline = reset = ''
114
115    def __init__(self, stream, width=79, encoding=None):
116        """Initialize.
117
118        @type  stream: file-like object.
119        @param stream: stream to output to.
120        @param width: maximum line width.
121        """
122        Formatter.__init__(self)
123        self.stream = stream
124        if encoding is None:
125            encoding = getattr(self.stream, 'encoding', None)
126        if encoding is None:
127            try:
128                encoding = locale.getpreferredencoding()
129            except locale.Error:
130                encoding = 'ascii'
131        self.encoding = encoding
132        self.width = width
133        self._pos = 0
134        self._in_first_line = True
135        self._wrote_something = False
136        self.first_prefix = []
137        self.later_prefix = []
138
139
140    def _write_prefix(self, wrap):
141        if self._in_first_line:
142            prefix = self.first_prefix
143        else:
144            prefix = self.later_prefix
145        # This is a bit braindead since it duplicates a lot of code
146        # from write. Avoids fun things like word wrapped prefix though.
147
148        for thing in prefix:
149            while callable(thing):
150                thing = thing(self)
151            if thing is None:
152                continue
153            if not isinstance(thing, basestring):
154                thing = str(thing)
155            self._pos += len(thing)
156            if isinstance(thing, unicode):
157                thing = thing.encode(self.encoding, 'replace')
158            self.stream.write(thing)
159        if wrap and self._pos >= self.width:
160            # XXX What to do? Our prefix does not fit.
161            # This makes sure we still output something,
162            # but it is completely arbitrary.
163            self._pos = self.width - 10
164
165
166    def write(self, *args, **kwargs):
167        wrap = kwargs.get('wrap', self.wrap)
168        autoline = kwargs.get('autoline', self.autoline)
169        prefixes = kwargs.get('prefixes')
170        first_prefixes = kwargs.get('first_prefixes')
171        later_prefixes = kwargs.get('later_prefixes')
172        if prefixes is not None:
173            if first_prefixes is not None or later_prefixes is not None:
174                raise TypeError(
175                    'do not pass first_prefixes or later_prefixes '
176                    'if prefixes is passed')
177            first_prefixes = later_prefixes = prefixes
178        prefix = kwargs.get('prefix')
179        first_prefix = kwargs.get('first_prefix')
180        later_prefix = kwargs.get('later_prefix')
181        if prefix is not None:
182            if first_prefix is not None or later_prefix is not None:
183                raise TypeError(
184                    'do not pass first_prefix or later_prefix with prefix')
185            first_prefix = later_prefix = prefix
186        if first_prefix is not None:
187            if first_prefixes is not None:
188                raise TypeError(
189                    'do not pass both first_prefix and first_prefixes')
190            first_prefixes = (first_prefix,)
191        if later_prefix is not None:
192            if later_prefixes is not None:
193                raise TypeError(
194                    'do not pass both later_prefix and later_prefixes')
195            later_prefixes = (later_prefix,)
196        if first_prefixes is not None:
197            self.first_prefix.extend(first_prefixes)
198        if later_prefixes is not None:
199            self.later_prefix.extend(later_prefixes)
200        # Remove this nested try block once we depend on python 2.5
201        try:
202            try:
203                for arg in args:
204                    # If we're at the start of the line, write our prefix.
205                    # There is a deficiency here: if neither our arg nor our
206                    # prefix affect _pos (both are escape sequences or empty)
207                    # we will write prefix more than once. This should not
208                    # matter.
209                    if not self._pos:
210                        self._write_prefix(wrap)
211                    while callable(arg):
212                        arg = arg(self)
213                    if arg is None:
214                        continue
215                    if not isinstance(arg, basestring):
216                        arg = str(arg)
217                    is_unicode = isinstance(arg, unicode)
218                    while wrap and self._pos + len(arg) > self.width:
219                        # We have to split.
220                        maxlen = self.width - self._pos
221                        space = arg.rfind(' ', 0, maxlen)
222                        if space == -1:
223                            # No space to split on.
224
225                            # If we are on the first line we can simply go to
226                            # the next (this helps if the "later" prefix is
227                            # shorter and should not really matter if not).
228
229                            # If we are on the second line and have already
230                            # written something we can also go to the next
231                            # line.
232                            if self._in_first_line or self._wrote_something:
233                                bit = ''
234                            else:
235                                # Forcibly split this as far to the right as
236                                # possible.
237                                bit = arg[:maxlen]
238                                arg = arg[maxlen:]
239                        else:
240                            bit = arg[:space]
241                            # Omit the space we split on.
242                            arg = arg[space+1:]
243                        if is_unicode:
244                            bit = bit.encode(self.encoding, 'replace')
245                        self.stream.write(bit)
246                        self.stream.write('\n')
247                        self._pos = 0
248                        self._in_first_line = False
249                        self._wrote_something = False
250                        self._write_prefix(wrap)
251
252                    # This fits.
253                    self._wrote_something = True
254                    self._pos += len(arg)
255                    if is_unicode:
256                        arg = arg.encode(self.encoding, 'replace')
257                    self.stream.write(arg)
258                if autoline:
259                    self.stream.write('\n')
260                    self._wrote_something = False
261                    self._pos = 0
262                    self._in_first_line = True
263            except IOError, e:
264                if e.errno == errno.EPIPE:
265                    raise StreamClosed(e)
266                raise
267        finally:
268            if first_prefixes is not None:
269                self.first_prefix = self.first_prefix[:-len(first_prefixes)]
270            if later_prefixes is not None:
271                self.later_prefix = self.later_prefix[:-len(later_prefixes)]
272
273    def fg(self, color=None):
274        return ''
275
276    def bg(self, color=None):
277        return ''
278
279try:
280    from snakeoil._formatters import PlainTextFormatter, StreamClosed
281except ImportError:
282    PlainTextFormatter = native_PlainTextFormatter
283    StreamClosed = native_StreamClosed
284
285# This is necessary because the curses module is optional (and we
286# should run on a very minimal python for bootstrapping).
287try:
288    import curses
289except ImportError:
290    TerminfoColor = None
291else:
292    class TerminfoColor(object):
293
294        def __init__(self, mode, color):
295            self.mode = mode
296            self.color = color
297
298        def __call__(self, formatter):
299            if self.color is None:
300                formatter._current_colors[self.mode] = None
301                res = formatter._color_reset
302                # slight abuse of boolean True/False and 1/0 equivalence
303                other = formatter._current_colors[not self.mode]
304                if other is not None:
305                    res = res + other
306            else:
307                if self.mode == 0:
308                    default = curses.COLOR_WHITE
309                else:
310                    default = curses.COLOR_BLACK
311                color = formatter._colors.get(self.color, default)
312                # The curses module currently segfaults if handed a
313                # bogus template so check explicitly.
314                template = formatter._set_color[self.mode]
315                if template:
316                    res = curses.tparm(template, color)
317                else:
318                    res = ''
319                formatter._current_colors[self.mode] = res
320            formatter.stream.write(res)
321
322
323    class TerminfoCode(object):
324        def __init__(self, value):
325            self.value = value
326
327    class TerminfoMode(TerminfoCode):
328        def __call__(self, formatter):
329            formatter._modes.add(self)
330            formatter.stream.write(self.value)
331
332    class TerminfoReset(TerminfoCode):
333        def __call__(self, formatter):
334            formatter._modes.clear()
335            formatter.stream.write(self.value)
336
337
338    class TerminfoFormatter(PlainTextFormatter):
339
340        """Formatter writing to a tty, using terminfo to do colors."""
341
342        _colors = dict(
343            black = curses.COLOR_BLACK,
344            red = curses.COLOR_RED,
345            green = curses.COLOR_GREEN,
346            yellow = curses.COLOR_YELLOW,
347            blue = curses.COLOR_BLUE,
348            magenta = curses.COLOR_MAGENTA,
349            cyan = curses.COLOR_CYAN,
350            white = curses.COLOR_WHITE)
351
352        # Remapping of TERM setting to more capable equivalent.
353        # Mainly used to force on the hardstatus (aka title bar updates)
354        # capability for terminals that do not support this by default.
355        term_alternates = {
356            'xterm': 'xterm+sl',
357            'screen': 'screen-s',
358            }
359
360        def __init__(self, stream, term=None, forcetty=False, encoding=None):
361            """Initialize.
362
363            @type  stream: file-like object.
364            @param stream: stream to output to, defaulting to C{sys.stdout}.
365            @type  term: string.
366            @param term: terminal type, pulled from the environment if omitted.
367            @type  forcetty: bool
368            @param forcetty: force output of colors even if the wrapped stream
369                             is not a tty.
370            """
371            PlainTextFormatter.__init__(self, stream, encoding=encoding)
372            fd = stream.fileno()
373            if term is None:
374                # We only apply the remapping if we are guessing the
375                # terminal type from the environment. If we get a term
376                # type passed explicitly we just use it as-is (if the
377                # caller wants the remap just doing the
378                # term_alternates lookup there is easy enough.)
379                term_env = os.environ.get('TERM')
380                term_alt = self.term_alternates.get(term_env)
381                for term in (term_alt, term_env, 'dumb'):
382                    if term is not None:
383                        try:
384                            curses.setupterm(fd=fd, term=term)
385                        except curses.error:
386                            pass
387                        else:
388                            break
389                else:
390                    raise ValueError(
391                        'no terminfo entries, not even for "dumb"?')
392            else:
393                # TODO maybe do something more useful than raising curses.error
394                # if term is not in the terminfo db here?
395                curses.setupterm(fd=fd, term=term)
396            self.width = curses.tigetnum('cols')
397            self.reset = TerminfoReset(curses.tigetstr('sgr0'))
398            self.bold = TerminfoMode(curses.tigetstr('bold'))
399            self.underline = TerminfoMode(curses.tigetstr('smul'))
400            self._color_reset = curses.tigetstr('op')
401            self._set_color = (
402                curses.tigetstr('setaf'), curses.tigetstr('setab'))
403            self._width = curses.tigetstr('cols')
404            # [fg, bg]
405            self._current_colors = [None, None]
406            self._modes = set()
407            self._pos = 0
408
409        def fg(self, color=None):
410            return TerminfoColor(0, color)
411
412        def bg(self, color=None):
413            return TerminfoColor(1, color)
414
415        def write(self, *args, **kwargs):
416            PlainTextFormatter.write(self, *args, **kwargs)
417            try:
418                if self._modes:
419                    self.reset(self)
420                if self._current_colors != [None, None]:
421                    self._current_colors = [None, None]
422                    self.stream.write(self._color_reset)
423            except IOError, e:
424                if e.errno == errno.EPIPE:
425                    raise StreamClosed(e)
426                raise
427
428        def title(self, string):
429            # I want to use curses.tigetflag('hs') here but at least
430            # the screen-s entry defines a tsl and fsl string but does
431            # not set the hs flag. So just check for the ability to
432            # jump to and out of the status line, without checking if
433            # the status line we're using exists.
434            if curses.tigetstr('tsl') and curses.tigetstr('fsl'):
435                self.stream.write(
436                    curses.tigetstr('tsl') + string + curses.tigetstr('fsl'))
437                self.stream.flush()
438
439
440class ObserverFormatter(object):
441
442    def __init__(self, real_formatter):
443        self._formatter = real_formatter
444
445    def write(self, *args):
446        self._formatter.write(autoline=False, *args)
447
448    __getattr__ = GetAttrProxy("_formatter")
449
450
451def get_formatter(stream):
452    """TerminfoFormatter if the stream is a tty, else PlainTextFormatter."""
453    if TerminfoColor is None:
454        return PlainTextFormatter(stream)
455    try:
456        fd = stream.fileno()
457    except AttributeError:
458        pass
459    else:
460        # We do this instead of stream.isatty() because TerminfoFormatter
461        # needs an fd to pass to curses, not just a filelike talking to a tty.
462        if os.isatty(fd):
463            try:
464                return TerminfoFormatter(stream)
465            except curses.error:
466                # This happens if TERM is unset and possibly in more cases.
467                # Just fall back to the PlainTextFormatter.
468                pass
469    return PlainTextFormatter(stream)
470
471
472def decorate_forced_wrapping(setting=True):
473    def wrapped_func(func):
474        def f(out, *args, **kwds):
475            oldwrap = out.wrap
476            out.wrap = setting
477            try:
478                return func(out, *args, **kwds)
479            finally:
480                out.wrap = oldwrap
481        return f
482    return wrapped_func
Note: See TracBrowser for help on using the browser.