#! /usr/bin/env python # An Arduino oscilloscope written in Python. # No more hassling with Processing! # Copyright 2012 by Akkana Peck: share and enjoy under the GPL v2 or later. # # This oscilloscope uses the optimized digital protocol from # Practical Arduino, by Jonathan Oxer and Hugh Blemings: # http://www.practicalarduino.com/projects/scope-logic-analyzer # If you want to use the ASCII analog protocol from Ardhinoscope, # set self.optimized = False around line 228. # Eventually this will be a command-line option, or auto-detected. # # Bugs: only one channel at a time currently works. import sys, serial import time import gtk, gobject Version = 0.2 VersionString = "Scopino v. %g, by Akkana Peck" % Version class TestArduino() : def open(self, baud=115200) : return def close() : return def readbyte(self) : # quarter second square wave t = int(time.time() * 4) % 4 if t == 0 or t == 2 : return chr(0) return chr(0xff) def readline(self) : # quarter second square wave if int(time.time() * 100) % 25 == 0 : return '255 255 255 255 255 255 255 255' return '0 0 0 0 0 0 0 0' class Arduino() : def open(self, baud=115200) : # Port may vary, so look for it: baseports = ['/dev/ttyUSB', '/dev/ttyACM'] self.ser = None for baseport in baseports : if self.ser : break for i in xrange(0, 8) : try : port = baseport + str(i) self.ser = serial.Serial(port, baud, timeout=1) print "Opened", port break except : self.ser = None pass if not self.ser : print "Couldn't open a serial port" raise IOError("Couldn't open a serial port") self.flush() self.buffer = '' def close(self) : self.ser.close() def flush(self) : self.ser.flushInput() def readbyte(self) : waiting = self.ser.inWaiting() r = self.ser.read(waiting) if not r : return None return r[-1] def readline(self) : '''Blocking readline: return the most recent line read, flushing anything else. ''' #if not self.ser : # return None while True : if '\n' in self.buffer: #self.ser.flushInput() # flush anything else lines = self.buffer.split('\n') self.buffer = lines[-1] # save the last (partial) line #print "returning '" + lines[0].strip() + "'" #print "Saving '" + self.buffer + "'" return lines[0].strip() else : # block until something to read: self.buffer += self.ser.read(1) # then read the rest: self.buffer += self.ser.read(self.ser.inWaiting()) #print "read:", self.buffer #print "That's all I read" class ScopeRow() : fontdesc = None def __init__(self, x, y, w, h, drawing_area, width_in_msec, color=None) : # Pixel size and position: self.left = x self.top = y self.width = w self.height = h # Drawing parameters: self.drawing_area = drawing_area if color : self.color = color else : self.color = gtk.gdk.Color(65535, 65535, 0) self.fgc = None self.bgc = None # Data we'll be plotting: self.width_in_msec = width_in_msec self.points = [] self.times = [] self.pointer = 0 def reconfigure(self, x, y, w, h) : self.left = x self.top = y self.width = w self.height = h def reset(self) : self.pointer = 0 def add(self, newpoint, curtime=None) : """Add a point""" #if self.pointer > self.width : # # We won't be able to show any more, so return # return if not curtime : curtime = time.time() if len(self.points) <= self.pointer : self.points.append(newpoint) self.times.append(curtime) assert(len(self.points) > self.pointer) else : self.points[self.pointer] = newpoint self.times[self.pointer] = curtime # if self.pointer > 0 : # print self.pointer, ':', newpoint, " (", \ # ((self.times[self.pointer] - self.times[self.pointer-1]) * 1000), ')' self.pointer += 1 def draw(self) : if not self.drawing_area : print "No drawing area yet!" return if not self.fgc : self.fgc = self.drawing_area.window.new_gc() self.fgc.set_rgb_fg_color(self.color) self.bgc = self.drawing_area.window.new_gc() self.bgc.set_rgb_fg_color(gtk.gdk.Color(0, 0, 0)) if self.pointer <= 0 : print "draw() but no data yet" return # Clear the page: self.drawing_area.window.draw_rectangle(self.bgc, True, self.left, self.top, self.width, self.height) av = 0 lastx = self.left lasty = self.top + self.height basetime = self.times[0] mult = self.width * 1000 / self.width_in_msec # print "drawing", self.pointer, "points" # print "spanning", self.times[0], "-", self.times[self.pointer-1], \ # "=", self.times[self.pointer-1] - self.times[0] # print "draw: mult= ", mult, "width=", self.width for i in xrange(self.pointer) : x = int(self.left + (self.times[i] - basetime) * mult) y = self.top + self.height - self.points[i] * self.height / 256 #print x, "(", (self.times[i] - basetime), ")" self.drawing_area.window.draw_line(self.fgc, lastx, lasty, x, y) lastx = x lasty = y av += self.points[i] # Note time scale self.draw_string(ScopeWindow.nice_time_string(self.width_in_msec/1000.), self.left + self.width - 120, self.top + 20) # show average -- eventually could also do stats, fft etc. self.draw_string("av = %i" % int(av / (i+1)), self.left + self.width - 100, self.top + self.height - 30) def draw_string(self, str, x, y) : """Draw a string.""" import pango if not ScopeRow.fontdesc : ScopeRow.fontdesc = pango.FontDescription("Sans Bold 14") layout = self.drawing_area.create_pango_layout(str) layout.set_font_description (ScopeRow.fontdesc) self.drawing_area.window.draw_layout(self.fgc, x, y, layout) class ScopeWindow() : def __init__(self, input, numchannels) : self.input = input self.numchannels = numchannels self.drawing_area = None self.fgc = None self.bgc = None self.width = 0 self.height = 0 self.scope_rows = [None] * self.numchannels #self.timeout = 10 # msec self.xscale = 15 # Adjusts width of pulses self.width_in_msec = 1000 # Width of full screen, in msec # Interval between reads, in milleseconds: self.interval = self.width_in_msec # Will be w_i_m / width self.lastupdate = 0 # Are we using the super-optimized Practical Arduino digital protocol? # Eventually we should figure this out ourselves from the data stream. self.optimized = True def expose_handler(self, widget, event) : #print "Expose" oldwidth = self.width oldheight = self.height self.width, self.height = self.drawing_area.window.get_size() rowheight = self.height/self.numchannels spacing = 5 # Is this the first time through? if not self.fgc : self.fgc = widget.window.new_gc() self.bgc = widget.window.new_gc() self.bgc.set_rgb_fg_color(gtk.gdk.Color(0, 0, 0)) for i in xrange(self.numchannels) : self.scope_rows[i] = ScopeRow(4, rowheight * i, self.width, rowheight-spacing, self.drawing_area, self.width_in_msec, None) elif oldwidth != self.width or oldheight != self.height : for i in xrange(self.numchannels) : self.scope_rows[i].reconfigure(4, rowheight * i, self.width, rowheight-spacing) #self.drawing_area.window.draw_rectangle(self.bgc, True, 0, 0, # self.width, self.height) self.interval = float(self.width_in_msec) / self.width def change_time(self, widget, newtime) : self.width_in_msec = newtime self.interval = float(newtime) / self.width for i in xrange(self.numchannels) : self.scope_rows[i].width_in_msec = self.width_in_msec def draw(self) : for i in xrange(self.numchannels) : self.scope_rows[i].draw() self.scope_rows[i].reset() return True def update(self) : """Read one data point from the Arduino and update our scope rows. Don't actually draw anything yet, though. """ if not self.input : print "updating only once: nothing to read" return False # Is it time yet to update? # pygtk timeouts only have millisecond granularity, # so we're calling much more frequently than that # and using time.time() to see if it's really time. t = time.time() if (t - self.lastupdate)*1000 < self.interval : # Not time yet #input.flush() return True # print "Adding a point after", (t - self.lastupdate) * 1000, "msec" self.lastupdate = t # Now read values. The Arduino may be running the original # Arduinoscope software, that writes a line like # 10 234 52 44 ... # or it may be running the optimized digitial version # from Practical Arduino, where one byte contains bits # representing (digitally read) A0 through A5 and D6 - D7. if self.optimized : c = self.input.readbyte() if c == None : # Don't use not c, that also catches c == 0x00 # print "readbyte returned nothing!" return True c = ord(c) # print "Read 0x%x" % c mask = 1 channels = [] for i in xrange(self.numchannels) : if c & mask : self.scope_rows[i].add(255, t) else : self.scope_rows[i].add(0, t) mask <<= 1 else : line = self.input.readline() #print "updating: read", line try : channels = map(int, line.split()) except : print "Skipping line:", line return True if len(channels) < self.numchannels : print "Only read", len(channels), "channels!", line return True #print channels[0:self.numchannels] for i in xrange(self.numchannels) : for j in xrange(self.xscale) : self.scope_rows[i].add(channels[i]) return True # call us back soon @staticmethod def nice_time_string(t) : if t >= 1 : return "%d sec" % t else : return "%d msec" % (t * 1000) def setup_menus(self, vbox) : menu_bar = gtk.MenuBar() vbox.pack_start(menu_bar, False, False, 2) menu_bar.show() # Set up a file menu menu = gtk.Menu() menu_item = gtk.MenuItem("Quit") menu_item.show() menu.append(menu_item) menu_item.connect("activate", gtk.main_quit) root_menu = gtk.MenuItem("File") root_menu.show() root_menu.set_submenu(menu) menu_bar.append(root_menu) # Set up a menu to allow different timescales menu = gtk.Menu() for t in ( .001, .005, .01, .05, .1, .5, 1, 5 ) : menu_item = gtk.MenuItem(ScopeWindow.nice_time_string(t)) menu_item.show() menu.append(menu_item) menu_item.connect("activate", self.change_time, int(t * 1000)) root_menu = gtk.MenuItem("Time") root_menu.show() root_menu.set_submenu(menu) menu_bar.append(root_menu) def show(self) : win = gtk.Window() vbox = gtk.VBox(False, 0) win.add(vbox) vbox.show() self.setup_menus(vbox) self.drawing_area = gtk.DrawingArea() self.drawing_area.connect("expose-event", self.expose_handler) vbox.pack_end(self.drawing_area, True, True, 2) self.drawing_area.show() win.connect("destroy", gtk.main_quit) win.set_default_size(1025, 512) # Use a timeout so we get regular interrupts: #gobject.timeout_add(self.timeout, self.update) #gobject.timeout_add(self.width_in_msec / self.width, self.update) # But timeouts can only happen on a millisecond level, and # we need data points denser than that. # So instead, process lines as fast as we can: gobject.idle_add(self.update) # and then only draw it occasionally: gobject.timeout_add(1000, self.draw) win.show() gtk.main() #arduino = TestArduino() arduino = Arduino() try : if len(sys.argv) > 1 : print "Using", sys.argv[1], "baud" arduino.open(baud=sys.argv[1]) else : arduino.open() except serial.SerialException : print "Disconnected (Serial exception)" except IOError : print "Disconnected (I/O Error)" except KeyboardInterrupt : print "Interrupt" # Doesn't actually work for more than one channel yet: # the display is okay but the data reading goes wonky. win = ScopeWindow(arduino, 1) win.show()