emacs/mps/tool/monitor
Gareth Rees e323bd2e15 New events arenapollbegin and arenapollend.
Measure fraction of CPU time in polling.
Represent the units of each TimeSeries.
Draw graphs for up to two different units using Axes.twinx.

Copied from Perforce
 Change: 194204
2018-06-26 16:21:48 +01:00

589 lines
21 KiB
Python
Executable file

#!/usr/bin/env python
#
# $Id$
# Copyright (c) 2018 Ravenbrook Limited. See end of file for license.
#
# Read a telemetry stream from a program using the MPS, construct a
# model of the MPS data structures in the progam, and display selected
# time series from the model in a graphical user interface.
#
# Requirements: Python 3.6, Matplotlib, PyQt5.
import argparse
from collections import defaultdict, deque, namedtuple
from itertools import cycle
import os
from struct import Struct
import sys
import time
from matplotlib.backends.qt_compat import QtCore, QtGui, QtWidgets
from matplotlib.backends.backend_qt5agg import (
FigureCanvas, NavigationToolbar2QT as NavigationToolbar)
from matplotlib.figure import Figure
import mpsevent
# Mapping from event code to a namedtuple for that event.
EVENT_NAMEDTUPLE = {
code: namedtuple(desc.name, ['header'] + [p.name for p in desc.params])
for code, desc in mpsevent.EVENT.items()
}
# Mapping from event code to event name.
EVENT_NAME = {code:desc.name for code, desc in mpsevent.EVENT.items()}
# Unpack function for event header.
HEADER_UNPACK = Struct(mpsevent.HEADER_FORMAT).unpack
# Unpack function for each event code.
EVENT_UNPACK = {c:Struct(d.format).unpack for c, d in mpsevent.EVENT.items()}
# Icon for the toolbar pause button.
PAUSE_ICON = os.path.abspath(os.path.join(os.path.dirname(__file__), 'pause'))
def telemetry_decoder(read):
"""Return a function that decodes the events in an I/O stream and
generate pairs (time, event) in time order, where time is CPU time
in seconds and event is a tuple.
Unknown event codes are read but ignored.
The 'read' argument must be a function implementing the
io.RawIOBase.read specification (that is, it takes a size and
returns up to size bytes from the I/O stream).
The function can be called multiple times (for example, if the
underlying file is non-blocking then call to read might raise
BlockingIOError) without losing events from the stream.
"""
# Cache frequently-used values in local variables.
header_desc = mpsevent.HeaderDesc
header_size = mpsevent.HEADER_SIZE
event_dict = mpsevent.EVENT
event_namedtuple = EVENT_NAMEDTUPLE
event_unpack = EVENT_UNPACK
header_unpack = HEADER_UNPACK
EventClockSync_code = mpsevent.Event.EventClockSync.code
EventInit_code = mpsevent.Event.EventInit.code
# Special handling for Intern events.
Intern_desc = mpsevent.Event.Intern
Intern_code = Intern_desc.code
Intern_struct = Struct(Intern_desc.format)
Intern_size = Intern_struct.size
Intern_unpack = Intern_struct.unpack
Intern_namedtuple = event_namedtuple[Intern_code]
batch = [] # Current batch of (unordered) events.
eventclocks = deque(maxlen=2) # Eventclocks from last two EventClockSync.
clocks = deque(maxlen=2) # Clocks from last two EventClockSync.
clocks_per_sec = None # CLOCKS_PER_SEC value from EventInit event.
def key(event):
"Key function for sorting events into time order."
return event.header.clock
def decoder():
nonlocal clocks_per_sec
while True:
header_data = read(header_size)
if not header_data:
break
header = header_desc(*header_unpack(header_data))
code = header.code
size = header.size - header_size
if code == Intern_code:
event_desc = event_dict[code]
assert size <= event_desc.maxsize
event = Intern_namedtuple(
header,
*Intern_unpack(read(Intern_size)),
read(size - Intern_size).rstrip(b'\0'))
elif code in event_dict:
event_desc = event_dict[code]
assert size == event_desc.maxsize
event = event_namedtuple[code](
header, *event_unpack[code](read(size)))
else:
# Unknown code might indicate a new event added since
# mpsevent.py was updated, so just read and ignore.
read(size)
batch.append(event)
if event.header.code == EventClockSync_code:
# Events are output in batches terminated by an EventClockSync
# event. So when we see an EventClockSync event, we know that
# we've received all events up to that one and can sort and
# emit the batch.
#
# The Time Stamp Counter frequency can vary due to thermal
# throttling, turbo boost etc., so linearly interpolate within
# each batch to convert to clocks and thence to seconds. (This
# requires at least two EventClockSync events.)
#
# In theory the Time Stamp Counter can wrap around, but it is
# a 64-bit register even on IA-32, and at 2.5 GHz it will take
# hundreds of years to do so, so we ignore this possibility.
#
# TODO: on 32-bit platforms at 1 MHz, clock values will wrap
# around in about 72 minutes and so this needs to be handled.
eventclocks.append(event.header.clock)
clocks.append(event.clock)
if len(clocks) == 2:
batch.sort(key=key)
# Solve for: seconds = m * eventclocks + c
m = ((clocks[1] - clocks[0])
/ ((eventclocks[1] - eventclocks[0]) * clocks_per_sec))
c = clocks[0] / clocks_per_sec - m * eventclocks[0]
for e in batch:
yield m * e.header.clock + c, e
batch.clear()
elif event.header.code == EventInit_code:
stream_version = event.major, event.median, event.minor
if stream_version[:2] != mpsevent.__version__[:2]:
raise RuntimeError(
"Monitor version {} is incompatible with "
"telemetry stream version {}.".format(
'.'.join(map(str, mpsevent.__version__)),
'.'.join(map(str, stream_version))))
clocks_per_sec = event.clocksPerSec
return decoder
class TimeSeries:
"Series of data points in time order."
def __init__(self):
self.t = []
self.y = []
def append(self, t, y):
"Append data y at time t."
assert not self.t or t >= self.t[-1]
self.t.append(t)
self.y.append(y)
class Accumulator(TimeSeries):
"Time series that is always non-negative and updates by accumulation."
def __init__(self, initial=0):
super().__init__()
self.value = initial
def add(self, t, delta):
"Add delta to the accumulator at time t."
self.append(t, self.value)
self.value += delta
self.append(t, self.value)
def sub(self, t, delta):
"Subtract delta from the accumulator at time t."
assert self.value >= delta
self.append(t, self.value)
self.value -= delta
self.append(t, self.value)
class MovingAverageRatio(TimeSeries):
"Exponentially weighted moving average on/off ratio."
def __init__(self, t, alpha=0.99):
super().__init__()
self._on = self._off = 0.0
self._last = t
self._alpha = alpha
self._beta = 1 - alpha
self._ratio = 0.0
def off(self, t):
self._on = t - self._last
self._last = t
ratio = self._on / (self._on + self._off)
self._ratio = self._ratio * self._alpha + ratio * self._beta
self.append(t, self._ratio)
def on(self, t):
self._off = t - self._last
self._last = t
class EventHandler:
"""Object that handles a telemetry event by dispatching to the method
with the same name as the event.
"""
def ignore(self, t, event):
"Handle a telemetry event at time t by doing nothing."
def handle(self, t, event):
"Handle a telemetry event at time t by dispatching."
getattr(self, EVENT_NAME[event.header.code], self.ignore)(t, event)
class Pool(EventHandler):
"Model of an MPS pool."
def __init__(self, arena, pointer, t):
"Create Pool owned by arena, at pointer, at time t."
self._arena = arena # Owning arena.
self._model = arena.model # Owning model.
self._pointer = pointer # Pool's pointer.
self._pool_class = None # Pool's class pointer.
self._serial = None # Pool's serial number within arena.
self._alloc = Accumulator()
self._model.add_time_series(
self, self._alloc, "bytes", "alloc",
"memory allocated by the pool from the arena")
@property
def name(self):
name = self._model.label(self._pointer)
if not name:
class_name = self._model.label(self._pool_class) or 'Pool'
if self._serial is not None:
name = f"{class_name}[{self._serial}]"
else:
name = f"{class_name}[{self._pointer:x}]"
return f"{self._arena.name}.{name}"
def ArenaAlloc(self, t, event):
self._alloc.add(t, event.size)
def ArenaFree(self, t, event):
self._alloc.sub(t, event.size)
def PoolInit(self, t, event):
self._pool_class = event.poolClass
self._serial = event.serial
class Arena(EventHandler):
"Model of an MPS arena."
def __init__(self, model, pointer, t):
"Create Arena owned by model, at pointer, at time t."
self.model = model # Owning model.
self._pointer = pointer # Arena's pointer.
self._arena_class = None # Arena's class pointer.
self._serial = None # Arena's serial number.
self._pools = [] # List of Pools ever belonging to arena.
self._pool = {} # pointer -> Pool (for live pools)
self._poll = MovingAverageRatio(t)
self.model.add_time_series(
self, self._poll, "fraction", "poll",
"polling fraction of CPU time (moving average)")
@property
def name(self):
if len(self.model.arenas) <= 1:
# No need to distinguish arenas if there's just one.
return ""
name = self.model.label(self._pointer)
if not name:
class_name = self.model.label(self._arena_class) or 'Arena'
if self._serial is not None:
name = f"{class_name}[{self._serial}]"
else:
name = f"{class_name}[{self._pointer:x}]"
return name
def delegate_to_pool(self, t, event):
"Handle a telemetry event by delegating to the pool model."
pointer = event.pool
try:
pool = self._pool[pointer]
except KeyError:
self._pool[pointer] = pool = Pool(self, pointer, t)
self._pools.append(pool)
pool.handle(t, event)
ArenaAlloc = ArenaFree = PoolInit = delegate_to_pool
def ArenaCreateVM(self, t, event):
self._arena_class = event.arenaClass
self._serial = event.serial
ArenaCreateCL = ArenaCreateVM
def PoolFinish(self, t, event):
del self._pool[event.pool]
def ArenaPollBegin(self, t, event):
self._poll.on(t)
def ArenaPollEnd(self, t, event):
self._poll.off(t)
class Line:
"A line in a Matplotlib plot wrapping a TimeSeries."
colors = cycle('blue orange green red purple brown pink gray olive cyan'
.split())
def __init__(self, owner, series, unit, name, desc):
self.owner = owner # Owning object.
self.series = series # Time series.
self.unit = unit # Unit.
self._name = name # Brief description.
self.desc = desc # Brief description.
self.draw = True # Plot this line?
self.color = next(self.colors)
@property
def name(self):
return f"{self.owner.name}.{self._name}"
@property
def ready(self):
return len(self.series.t) >= 2
def plot(self, axes):
"Plot line on axes."
if self.ready and self.draw:
x = self.series.t
y = self.series.y
axes.plot(x, y, color=self.color, label=self.name)
class Model(EventHandler):
"Model of an application using the MPS."
def __init__(self, events):
"Create model based on function generating telemetry events."
self._events = events
self._intern = {} # stringId -> string
self._label = {} # address or pointer -> stringId
self._arena = {} # pointer -> Arena (for live arenas)
self.arenas = [] # All arenas created in the model.
self.lines = [] # All Lines available for plotting.
self._unit_lines = defaultdict(list) # Lines collated by unit.
self._needs_redraw = True # Plot needs redrawing?
def add_time_series(self, *args):
"Add a time series to the model."
line = Line(*args)
self.lines.append(line)
self._unit_lines[line.unit].append(line)
def label(self, pointer):
"Return string labelling address or pointer, or None if unlabelled."
return self._intern.get(self._label.get(pointer))
def plot(self, axes_list):
"Draw time series on the given axes."
if not self._needs_redraw:
return
self._needs_redraw = False
# Determine which unit to plot on each axis.
lines_list = sorted(self._unit_lines.values(), key=len, reverse=True)
for axes in axes_list:
axes.clear()
axes.set_xlabel("time (seconds)")
for axes, lines in zip(axes_list, lines_list):
axes.set_ylabel(lines[0].unit)
for line in lines:
line.plot(axes)
axes.figure.canvas.draw()
def update(self):
"Consume available telemetry events and update the model."
try:
for t, event in self._events():
self.handle(t, event)
except BlockingIOError:
pass
def needs_redraw(self):
"Call this when the model needs redrawing."
self._needs_redraw = True
def delegate_to_arena(self, t, event):
"Handle a telemetry event by delegating to the arena model."
addr = event.arena
try:
arena = self._arena[addr]
except KeyError:
self._arena[addr] = arena = Arena(self, addr, t)
self.arenas.append(arena)
arena.handle(t, event)
ArenaCreateVM = ArenaCreateCL = ArenaAlloc = ArenaFree = ArenaPollBegin = \
ArenaPollEnd = PoolInit = PoolFinish = delegate_to_arena
def EventClockSync(self, t, event):
self.needs_redraw()
def Intern(self, t, event):
self._intern[event.stringId] = event.string.decode('ascii', 'replace')
def Label(self, t, event):
self._label[event.address] = event.stringId
def LabelPointer(self, t, event):
self._label[event.pointer] = event.stringId
def ArenaDestroy(self, t, event):
del self._arena[event.arena]
class ApplicationToolbar(NavigationToolbar):
"Subclass of Matplotlib's navigation toolbar adding a pause button."
def __init__(self, *args):
self.toolitems += (('Pause', 'Pause', PAUSE_ICON, 'pause'),)
super().__init__(*args)
self._actions['pause'].setCheckable(True)
self.paused = False
def pause(self):
"Toggle the pause button."
self.paused = not self.paused
self._actions['pause'].setChecked(self.paused)
class ApplicationWindow(QtWidgets.QMainWindow):
"""PyQt5 application displaying time series derived from MPS telemetry
output.
"""
def __init__(self, model : Model, title : str):
"""Create application. 'model' is the MPS model whose time series are
to be displayed, and 'title' is the main window title.
"""
super().__init__()
self._model = model # The MPS model.
self._home_limits = None # Limits of the graph in "home" position.
self._line_checkbox = {} # Line -> QCheckbox
self.setWindowTitle(title)
# Control-W (Command-W on macOS) shortcut for closing the window.
shortcut = QtWidgets.QShortcut(QtGui.QKeySequence("Ctrl+W"), self)
shortcut.activated.connect(self.close)
main = QtWidgets.QWidget()
self.setCentralWidget(main)
main_layout = QtWidgets.QHBoxLayout(main)
# Scrollable list of checkboxes, one for each time series.
self._lines = QtWidgets.QVBoxLayout()
self._lines_scroll = QtWidgets.QScrollArea(
horizontalScrollBarPolicy=QtCore.Qt.ScrollBarAlwaysOff)
self._lines_widget = QtWidgets.QWidget()
lines_layout = QtWidgets.QVBoxLayout(self._lines_widget)
lines_layout.addLayout(self._lines)
lines_layout.addStretch(1)
self._lines_scroll.setWidget(self._lines_widget)
self._lines_scroll.setWidgetResizable(True)
main_layout.addWidget(self._lines_scroll)
# Matplot canvas and toolbar.
canvas = FigureCanvas(Figure(figsize=(10, 6)))
self._axes = canvas.figure.subplots()
self._axes2 = self._axes.twinx()
main_layout.addWidget(canvas)
self._toolbar = ApplicationToolbar(canvas, self)
self.addToolBar(QtCore.Qt.BottomToolBarArea, self._toolbar)
# Call self._update in a loop forever.
self._update()
self._timer = canvas.new_timer(100, [(self._update, (), {})])
self._timer.start()
@property
def _limits(self):
"Current x and y limits of the Matplotlib graph."
return self._axes.get_xlim(), self._axes.get_ylim()
def _update(self):
"Update the model and redraw if not paused."
if (not self._toolbar.paused
and self._home_limits not in (None, self._limits)):
# Limits changed (for example, because user zoomed in), so pause
# further updates to give user a chance to explore.
self._toolbar.pause()
self._home_limits = None
self._model.update()
if not self._toolbar.paused:
self._model.plot([self._axes, self._axes2])
self._home_limits = self._limits
# Find new time series and create corresponding checkboxes.
for line in self._model.lines:
if not line.ready:
continue
if line in self._line_checkbox:
# A line's name can change dynamically (for example, because
# of the creation of a second arena, or a Label event), so
# ensure that it is up to date.
self._line_checkbox[line].setText(line.name)
else:
checkbox = QtWidgets.QCheckBox(line.name)
self._line_checkbox[line] = checkbox
checkbox.setChecked(True)
checkbox.setToolTip(line.desc)
self._lines.addWidget(checkbox)
def state_changed(state, line=line):
line.draw = bool(state)
self._model.needs_redraw()
checkbox.stateChanged.connect(state_changed)
checkbox.setStyleSheet(f"color:{line.color}")
self._lines_scroll.setFixedWidth(
self._lines_widget.sizeHint().width())
def main():
parser = argparse.ArgumentParser(description="Memory Pool System Monitor.")
parser.add_argument(
'telemetry', metavar='FILENAME', nargs='?', type=str,
default=os.environ.get('MPS_TELEMETRY_FILENAME', 'mpsio.log'),
help="telemetry output from the MPS instance")
args = parser.parse_args()
# TODO: O_NONBLOCK does not work on Windows.
fd = os.open(args.telemetry, os.O_NONBLOCK, os.O_RDONLY)
read = os.fdopen(fd, 'rb').read
model = Model(telemetry_decoder(read))
qapp = QtWidgets.QApplication([])
app = ApplicationWindow(model, args.telemetry)
app.show()
return qapp.exec_()
if __name__ == '__main__':
exit(main())
# C. COPYRIGHT AND LICENCE
#
# Copyright (c) 2018 Ravenbrook Ltd. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#
# $Id$