mirror of
				https://gitlab.freedesktop.org/pulseaudio/pulseaudio.git
				synced 2025-11-03 09:01:50 -05:00 
			
		
		
		
	added qpaeq script for GUI equalizer control to src/util
This commit is contained in:
		
							parent
							
								
									493d8b2fb7
								
							
						
					
					
						commit
						41853cc7c0
					
				
					 1 changed files with 546 additions and 0 deletions
				
			
		
							
								
								
									
										546
									
								
								src/utils/qpaeq.py
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										546
									
								
								src/utils/qpaeq.py
									
										
									
									
									
										Executable file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,546 @@
 | 
			
		|||
#!/usr/bin/env python
 | 
			
		||||
#    qpaeq is a equalizer interface for pulseaudio's equalizer sinks
 | 
			
		||||
#    Copyright (C) 2009  Jason Newton <nevion@gmail.com
 | 
			
		||||
#
 | 
			
		||||
#    This program is free software: you can redistribute it and/or modify
 | 
			
		||||
#    it under the terms of the GNU Affero General Public License as
 | 
			
		||||
#    published by the Free Software Foundation, either version 3 of the
 | 
			
		||||
#    License, or (at your option) any later version.
 | 
			
		||||
#
 | 
			
		||||
#    This program is distributed in the hope that it will be useful,
 | 
			
		||||
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
#    GNU Affero General Public License for more details.
 | 
			
		||||
#
 | 
			
		||||
#    You should have received a copy of the GNU Affero General Public License
 | 
			
		||||
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
import os,math,sys
 | 
			
		||||
import PyQt4,sip
 | 
			
		||||
from PyQt4 import QtGui,QtCore
 | 
			
		||||
from functools import partial
 | 
			
		||||
 | 
			
		||||
import dbus.mainloop.qt
 | 
			
		||||
import dbus
 | 
			
		||||
 | 
			
		||||
import signal
 | 
			
		||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
 | 
			
		||||
SYNC_TIMEOUT = 4*1000
 | 
			
		||||
 | 
			
		||||
CORE_PATH = "/org/pulseaudio/core1"
 | 
			
		||||
CORE_IFACE = "org.PulseAudio.Core1"
 | 
			
		||||
def connect():
 | 
			
		||||
    if 'PULSE_DBUS_SERVER' in os.environ:
 | 
			
		||||
        address = os.environ['PULSE_DBUS_SERVER']
 | 
			
		||||
    else:
 | 
			
		||||
        bus = dbus.SessionBus() # Should be UserBus, but D-Bus doesn't implement that yet.
 | 
			
		||||
        server_lookup = bus.get_object('org.PulseAudio1', '/org/pulseaudio/server_lookup1')
 | 
			
		||||
        address = server_lookup.Get('org.PulseAudio.ServerLookup1', 'Address', dbus_interface='org.freedesktop.DBus.Properties')
 | 
			
		||||
    return dbus.connection.Connection(address)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#TODO: signals: sink Filter changed, sink reconfigured (window size) (sink iface)
 | 
			
		||||
#TODO: manager signals: new sink, removed sink, new profile, removed profile
 | 
			
		||||
#TODO: add support for changing of window_size 1000-fft_size (adv option)
 | 
			
		||||
#TODO: reconnect support loop 1 second trying to reconnect
 | 
			
		||||
#TODO: just resample the filters for profiles when loading to different sizes
 | 
			
		||||
#TODO: add preamp
 | 
			
		||||
prop_iface='org.freedesktop.DBus.Properties'
 | 
			
		||||
eq_iface='org.PulseAudio.Ext.Equalizing1.Equalizer'
 | 
			
		||||
device_iface='org.PulseAudio.Core1.Device'
 | 
			
		||||
class QPaeq(QtGui.QWidget):
 | 
			
		||||
    manager_path='/org/pulseaudio/equalizing1'
 | 
			
		||||
    manager_iface='org.PulseAudio.Ext.Equalizing1.Manager'
 | 
			
		||||
    core_iface='org.PulseAudio.Core1'
 | 
			
		||||
    core_path='/org/pulseaudio/core1'
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        QtGui.QWidget.__init__(self)
 | 
			
		||||
        self.setWindowTitle('qpaeq')
 | 
			
		||||
        self.slider_widget=None
 | 
			
		||||
        self.sink_name=None
 | 
			
		||||
        self.filter_state=None
 | 
			
		||||
 | 
			
		||||
        self.create_layout()
 | 
			
		||||
 | 
			
		||||
        self.set_connection()
 | 
			
		||||
        self.connect_to_sink(self.sinks[0])
 | 
			
		||||
        self.set_callbacks()
 | 
			
		||||
        self.setMinimumSize(self.sizeHint())
 | 
			
		||||
 | 
			
		||||
    def create_layout(self):
 | 
			
		||||
        self.main_layout=QtGui.QVBoxLayout()
 | 
			
		||||
        self.setLayout(self.main_layout)
 | 
			
		||||
        toprow_layout=QtGui.QHBoxLayout()
 | 
			
		||||
        sizePolicy = QtGui.QSizePolicy(QtGui.QSizePolicy.Preferred, QtGui.QSizePolicy.Fixed)
 | 
			
		||||
        sizePolicy.setHorizontalStretch(0)
 | 
			
		||||
        sizePolicy.setVerticalStretch(0)
 | 
			
		||||
        #sizePolicy.setHeightForWidth(self.profile_box.sizePolicy().hasHeightForWidth())
 | 
			
		||||
 | 
			
		||||
        toprow_layout.addWidget(QtGui.QLabel('Sink'))
 | 
			
		||||
        self.sink_box = QtGui.QComboBox()
 | 
			
		||||
        self.sink_box.setSizePolicy(sizePolicy)
 | 
			
		||||
        self.sink_box.setDuplicatesEnabled(False)
 | 
			
		||||
        self.sink_box.setInsertPolicy(QtGui.QComboBox.InsertAlphabetically)
 | 
			
		||||
        #self.sink_box.setSizeAdjustPolicy(QtGui.QComboBox.AdjustToContents)
 | 
			
		||||
        toprow_layout.addWidget(self.sink_box)
 | 
			
		||||
 | 
			
		||||
        toprow_layout.addWidget(QtGui.QLabel('Channel'))
 | 
			
		||||
        self.channel_box = QtGui.QComboBox()
 | 
			
		||||
        self.channel_box.setSizePolicy(sizePolicy)
 | 
			
		||||
        toprow_layout.addWidget(self.channel_box)
 | 
			
		||||
 | 
			
		||||
        toprow_layout.addWidget(QtGui.QLabel('Preset'))
 | 
			
		||||
        self.profile_box = QtGui.QComboBox()
 | 
			
		||||
        self.profile_box.setSizePolicy(sizePolicy)
 | 
			
		||||
        self.profile_box.setInsertPolicy(QtGui.QComboBox.InsertAlphabetically)
 | 
			
		||||
        #self.profile_box.setSizeAdjustPolicy(QtGui.QComboBox.AdjustToContents)
 | 
			
		||||
        toprow_layout.addWidget(self.profile_box)
 | 
			
		||||
 | 
			
		||||
        large_icon_size=self.style().pixelMetric(QtGui.QStyle.PM_LargeIconSize)
 | 
			
		||||
        large_icon_size=QtCore.QSize(large_icon_size,large_icon_size)
 | 
			
		||||
        save_profile=QtGui.QToolButton()
 | 
			
		||||
        save_profile.setIcon(self.style().standardIcon(QtGui.QStyle.SP_DriveFDIcon))
 | 
			
		||||
        save_profile.setIconSize(large_icon_size)
 | 
			
		||||
        save_profile.setToolButtonStyle(QtCore.Qt.ToolButtonIconOnly)
 | 
			
		||||
        save_profile.clicked.connect(self.save_profile)
 | 
			
		||||
        remove_profile=QtGui.QToolButton()
 | 
			
		||||
        remove_profile.setIcon(self.style().standardIcon(QtGui.QStyle.SP_TrashIcon))
 | 
			
		||||
        remove_profile.setIconSize(large_icon_size)
 | 
			
		||||
        remove_profile.setToolButtonStyle(QtCore.Qt.ToolButtonIconOnly)
 | 
			
		||||
        remove_profile.clicked.connect(self.remove_profile)
 | 
			
		||||
        toprow_layout.addWidget(save_profile)
 | 
			
		||||
        toprow_layout.addWidget(remove_profile)
 | 
			
		||||
 | 
			
		||||
        reset_button = QtGui.QPushButton('Reset')
 | 
			
		||||
        reset_button.clicked.connect(self.reset)
 | 
			
		||||
        toprow_layout.addStretch()
 | 
			
		||||
        toprow_layout.addWidget(reset_button)
 | 
			
		||||
        self.layout().addLayout(toprow_layout)
 | 
			
		||||
 | 
			
		||||
        self.profile_box.activated.connect(self.load_profile)
 | 
			
		||||
        self.channel_box.activated.connect(self.select_channel)
 | 
			
		||||
    def connect_to_sink(self,name):
 | 
			
		||||
        #TODO: clear slots for profile buttons
 | 
			
		||||
 | 
			
		||||
        #flush any pending saves for other sinks
 | 
			
		||||
        if self.filter_state is not None:
 | 
			
		||||
            self.filter_state.flush_state()
 | 
			
		||||
        sink=self.connection.get_object(object_path=name)
 | 
			
		||||
        self.sink_props=dbus.Interface(sink,dbus_interface=prop_iface)
 | 
			
		||||
        self.sink=dbus.Interface(sink,dbus_interface=eq_iface)
 | 
			
		||||
        self.filter_state=FilterState(sink)
 | 
			
		||||
        #sample_rate,filter_rate,channels,channel)
 | 
			
		||||
 | 
			
		||||
        self.channel_box.clear()
 | 
			
		||||
        self.channel_box.addItem('All',self.filter_state.channels)
 | 
			
		||||
        for i in xrange(self.filter_state.channels):
 | 
			
		||||
            self.channel_box.addItem('%d' %(i+1,),i)
 | 
			
		||||
        self.setMinimumSize(self.sizeHint())
 | 
			
		||||
 | 
			
		||||
        self.set_slider_widget(SliderArray(self.filter_state))
 | 
			
		||||
 | 
			
		||||
        self.sink_name=name
 | 
			
		||||
        #set the signal listener for this sink
 | 
			
		||||
        core=self._get_core()
 | 
			
		||||
        #temporary hack until signal filtering works properly
 | 
			
		||||
        core.ListenForSignal('',[dbus.ObjectPath(self.sink_name),dbus.ObjectPath(self.manager_path)])
 | 
			
		||||
        #for x in ['FilterChanged']:
 | 
			
		||||
        #    core.ListenForSignal("%s.%s" %(self.eq_iface,x),[dbus.ObjectPath(self.sink_name)])
 | 
			
		||||
        #core.ListenForSignal(self.eq_iface,[dbus.ObjectPath(self.sink_name)])
 | 
			
		||||
        self.sink.connect_to_signal('FilterChanged',self.read_filter)
 | 
			
		||||
 | 
			
		||||
    def set_slider_widget(self,widget):
 | 
			
		||||
        layout=self.layout()
 | 
			
		||||
        if self.slider_widget is not None:
 | 
			
		||||
            i=layout.indexOf(self.slider_widget)
 | 
			
		||||
            layout.removeWidget(self.slider_widget)
 | 
			
		||||
            self.slider_widget.deleteLater()
 | 
			
		||||
            layout.insertWidget(i,self.slider_widget)
 | 
			
		||||
        else:
 | 
			
		||||
            layout.addWidget(widget)
 | 
			
		||||
        self.slider_widget=widget
 | 
			
		||||
        self.read_filter()
 | 
			
		||||
    def _get_core(self):
 | 
			
		||||
        core_obj=self.connection.get_object(object_path=self.core_path)
 | 
			
		||||
        core=dbus.Interface(core_obj,dbus_interface=self.core_iface)
 | 
			
		||||
        return core
 | 
			
		||||
    def sink_added(self,sink):
 | 
			
		||||
        #TODO: preserve selected sink
 | 
			
		||||
        self.update_sinks()
 | 
			
		||||
    def sink_removed(self,sink):
 | 
			
		||||
        #TODO: preserve selected sink, try connecting to backup otherwise
 | 
			
		||||
        if sink==self.sink_name:
 | 
			
		||||
            #connect to new sink?
 | 
			
		||||
            pass
 | 
			
		||||
        self.update_sinks()
 | 
			
		||||
    def save_profile(self):
 | 
			
		||||
        #popup dialog box for name
 | 
			
		||||
        current=self.profile_box.currentIndex()
 | 
			
		||||
        profile,ok=QtGui.QInputDialog.getItem(self,'Preset Name','Preset',self.profiles,current)
 | 
			
		||||
        if not ok or profile=='':
 | 
			
		||||
            return
 | 
			
		||||
        if profile in self.profiles:
 | 
			
		||||
            mbox=QtGui.QMessageBox(self)
 | 
			
		||||
            mbox.setText('%s preset already exists'%(profile,))
 | 
			
		||||
            mbox.setInformativeText('Do you want to save over it?')
 | 
			
		||||
            mbox.setStandardButtons(mbox.Save|mbox.Discard|mbox.Cancel)
 | 
			
		||||
            mbox.setDefaultButton(mbox.Save)
 | 
			
		||||
            ret=mbox.exec_()
 | 
			
		||||
            if ret!=mbox.Save:
 | 
			
		||||
                return
 | 
			
		||||
        self.sink.SaveProfile(self.filter_state.channel,dbus.String(profile))
 | 
			
		||||
        if self.filter_state.channel==self.filter_state.channels:
 | 
			
		||||
            for x in range(1,self.filter_state.channels):
 | 
			
		||||
                self.sink.LoadProfile(x,dbus.String(profile))
 | 
			
		||||
    def remove_profile(self):
 | 
			
		||||
        #find active profile name, remove it
 | 
			
		||||
        profile=self.profile_box.currentText()
 | 
			
		||||
        manager=dbus.Interface(self.manager_obj,dbus_interface=self.manager_iface)
 | 
			
		||||
        manager.RemoveProfile(dbus.String(profile))
 | 
			
		||||
    def load_profile(self,x):
 | 
			
		||||
        profile=self.profile_box.itemText(x)
 | 
			
		||||
        self.filter_state.load_profile(profile)
 | 
			
		||||
    def select_channel(self,x):
 | 
			
		||||
        self.filter_state.channel = self.channel_box.itemData(x).toPyObject()
 | 
			
		||||
        self._set_profile_name()
 | 
			
		||||
        self.filter_state.readback()
 | 
			
		||||
 | 
			
		||||
    #TODO: add back in preamp!
 | 
			
		||||
    #print frequencies
 | 
			
		||||
    #main_layout.addLayout(self.create_slider(partial(self.update_coefficient,0),
 | 
			
		||||
    #    'Preamp')[0]
 | 
			
		||||
    #)
 | 
			
		||||
    def set_connection(self):
 | 
			
		||||
        self.connection=connect()
 | 
			
		||||
        self.manager_obj=self.connection.get_object(object_path=self.manager_path)
 | 
			
		||||
        manager_props=dbus.Interface(self.manager_obj,dbus_interface=prop_iface)
 | 
			
		||||
        self.sinks=manager_props.Get(self.manager_iface,'EqualizedSinks')
 | 
			
		||||
    def set_callbacks(self):
 | 
			
		||||
        manager=dbus.Interface(self.manager_obj,dbus_interface=self.manager_iface)
 | 
			
		||||
        manager.connect_to_signal('ProfilesChanged',self.update_profiles)
 | 
			
		||||
        manager.connect_to_signal('SinkAdded',self.sink_added)
 | 
			
		||||
        manager.connect_to_signal('SinkRemoved',self.sink_removed)
 | 
			
		||||
        #self._get_core().ListenForSignal(self.manager_iface,[])
 | 
			
		||||
        #self._get_core().ListenForSignal(self.manager_iface,[dbus.ObjectPath(self.manager_path)])
 | 
			
		||||
        #core=self._get_core()
 | 
			
		||||
        #for x in ['ProfilesChanged','SinkAdded','SinkRemoved']:
 | 
			
		||||
        #    core.ListenForSignal("%s.%s" %(self.manager_iface,x),[dbus.ObjectPath(self.manager_path)])
 | 
			
		||||
        self.update_profiles()
 | 
			
		||||
        self.update_sinks()
 | 
			
		||||
    def update_profiles(self):
 | 
			
		||||
        #print 'update profiles called!'
 | 
			
		||||
        manager_props=dbus.Interface(self.manager_obj,dbus_interface=prop_iface)
 | 
			
		||||
        self.profiles=manager_props.Get(self.manager_iface,'Profiles')
 | 
			
		||||
        self.profile_box.blockSignals(True)
 | 
			
		||||
        self.profile_box.clear()
 | 
			
		||||
        self.profile_box.addItems(self.profiles)
 | 
			
		||||
        self.profile_box.blockSignals(False)
 | 
			
		||||
        self._set_profile_name()
 | 
			
		||||
    def update_sinks(self):
 | 
			
		||||
        self.sink_box.blockSignals(True)
 | 
			
		||||
        self.sink_box.clear()
 | 
			
		||||
        for x in self.sinks:
 | 
			
		||||
            sink=self.connection.get_object(object_path=x)
 | 
			
		||||
            sink_props=dbus.Interface(sink,dbus_interface=prop_iface)
 | 
			
		||||
            simple_name=sink_props.Get(device_iface,'Name')
 | 
			
		||||
            self.sink_box.addItem(simple_name,x)
 | 
			
		||||
        self.sink_box.blockSignals(False)
 | 
			
		||||
        self.sink_box.setMinimumSize(self.sink_box.sizeHint())
 | 
			
		||||
    def read_filter(self):
 | 
			
		||||
        #print self.filter_frequencies
 | 
			
		||||
        self.filter_state.readback()
 | 
			
		||||
    def reset(self):
 | 
			
		||||
        coefs=dbus.Array([1/math.sqrt(2.0)]*(self.filter_state.filter_rate//2+1))
 | 
			
		||||
        preamp=1.0
 | 
			
		||||
        self.filter_state.set_filter(preamp,coefs)
 | 
			
		||||
    def _set_profile_name(self):
 | 
			
		||||
        self.profile_box.blockSignals(True)
 | 
			
		||||
        profile_name=self.sink.BaseProfile(self.filter_state.channel)
 | 
			
		||||
        if profile_name is not None:
 | 
			
		||||
            i=self.profile_box.findText(profile_name)
 | 
			
		||||
            if i>=0:
 | 
			
		||||
                self.profile_box.setCurrentIndex(i)
 | 
			
		||||
        self.profile_box.blockSignals(False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SliderArray(QtGui.QWidget):
 | 
			
		||||
    def __init__(self,filter_state,parent=None):
 | 
			
		||||
        super(SliderArray,self).__init__(parent)
 | 
			
		||||
        #self.setStyleSheet('padding: 0px; border-width: 0px; margin: 0px;')
 | 
			
		||||
        #self.setStyleSheet('font-size: 7pt; font-family: monospace;'+outline%('blue'))
 | 
			
		||||
        self.filter_state=filter_state
 | 
			
		||||
        self.setLayout(QtGui.QHBoxLayout())
 | 
			
		||||
        self.sub_array=None
 | 
			
		||||
        self.set_sub_array(SliderArraySub(self.filter_state))
 | 
			
		||||
        self.inhibit_resize=0
 | 
			
		||||
    def set_sub_array(self,widget):
 | 
			
		||||
        if self.sub_array is not None:
 | 
			
		||||
            self.layout().removeWidget(self.sub_array)
 | 
			
		||||
            self.sub_array.disconnect_signals()
 | 
			
		||||
            self.sub_array.deleteLater()
 | 
			
		||||
        self.sub_array=widget
 | 
			
		||||
        self.layout().addWidget(self.sub_array)
 | 
			
		||||
        self.sub_array.connect_signals()
 | 
			
		||||
        self.filter_state.readback()
 | 
			
		||||
    def resizeEvent(self,event):
 | 
			
		||||
        super(SliderArray,self).resizeEvent(event)
 | 
			
		||||
        if self.inhibit_resize==0:
 | 
			
		||||
            self.inhibit_resize+=1
 | 
			
		||||
            #self.add_sliders_to_fit()
 | 
			
		||||
            t=QtCore.QTimer(self)
 | 
			
		||||
            t.setSingleShot(True)
 | 
			
		||||
            t.setInterval(0)
 | 
			
		||||
            t.timeout.connect(partial(self.add_sliders_to_fit,event))
 | 
			
		||||
            t.start()
 | 
			
		||||
    def add_sliders_to_fit(self,event):
 | 
			
		||||
        if event.oldSize().width()>0 and event.size().width()>0:
 | 
			
		||||
            i=len(self.filter_state.frequencies)*int(round(float(event.size().width())/event.oldSize().width()))
 | 
			
		||||
        else:
 | 
			
		||||
            i=len(self.filter_state.frequencies)
 | 
			
		||||
 | 
			
		||||
        t_w=self.size().width()
 | 
			
		||||
        def evaluate(filter_state, target, variable):
 | 
			
		||||
            base_freqs=self.filter_state.freq_proper(self.filter_state.DEFAULT_FREQUENCIES)
 | 
			
		||||
            filter_state._set_frequency_values(subdivide(base_freqs,variable))
 | 
			
		||||
            new_widget=SliderArraySub(filter_state)
 | 
			
		||||
            w=new_widget.sizeHint().width()
 | 
			
		||||
            return w-target
 | 
			
		||||
        def searcher(initial,evaluator):
 | 
			
		||||
            i=initial
 | 
			
		||||
            def d(e): return 1 if e>=0 else -1
 | 
			
		||||
            error=evaluator(i)
 | 
			
		||||
            old_direction=d(error)
 | 
			
		||||
            i-=old_direction
 | 
			
		||||
            while True:
 | 
			
		||||
                error=evaluator(i)
 | 
			
		||||
                direction=d(error)
 | 
			
		||||
                if direction!=old_direction:
 | 
			
		||||
                    k=i-1
 | 
			
		||||
                    #while direction<0 and error!=0:
 | 
			
		||||
                    #    k-=1
 | 
			
		||||
                    #    error=evaluator(i)
 | 
			
		||||
                    #    direction=d(error)
 | 
			
		||||
                    return k, evaluator(k)
 | 
			
		||||
                i-=direction
 | 
			
		||||
                old_direction=direction
 | 
			
		||||
        searcher(i,partial(evaluate,self.filter_state,t_w))
 | 
			
		||||
        self.set_sub_array(SliderArraySub(self.filter_state))
 | 
			
		||||
        self.inhibit_resize-=1
 | 
			
		||||
 | 
			
		||||
class SliderArraySub(QtGui.QWidget):
 | 
			
		||||
    def __init__(self,filter_state,parent=None):
 | 
			
		||||
        super(SliderArraySub,self).__init__(parent)
 | 
			
		||||
        self.filter_state=filter_state
 | 
			
		||||
        self.setLayout(QtGui.QGridLayout())
 | 
			
		||||
        self.slider=[None]*len(self.filter_state.frequencies)
 | 
			
		||||
        self.label=[None]*len(self.slider)
 | 
			
		||||
        #self.setStyleSheet('padding: 0px; border-width: 0px; margin: 0px;')
 | 
			
		||||
        #self.setStyleSheet('font-size: 7pt; font-family: monospace;'+outline%('blue'))
 | 
			
		||||
        qt=QtCore.Qt
 | 
			
		||||
        #self.layout().setHorizontalSpacing(1)
 | 
			
		||||
        def add_slider(slider,label, c):
 | 
			
		||||
            self.layout().addWidget(slider,0,c,qt.AlignHCenter)
 | 
			
		||||
            self.layout().addWidget(label,1,c,qt.AlignHCenter)
 | 
			
		||||
            self.layout().setColumnMinimumWidth(c,max(label.sizeHint().width(),slider.sizeHint().width()))
 | 
			
		||||
        def create_slider(slider_label):
 | 
			
		||||
            slider=QtGui.QSlider(QtCore.Qt.Vertical,self)
 | 
			
		||||
            label=SliderLabel(slider_label,filter_state,self)
 | 
			
		||||
            slider.setRange(-1000,2000)
 | 
			
		||||
            slider.setSingleStep(1)
 | 
			
		||||
            return (slider,label)
 | 
			
		||||
        self.preamp_slider,self.preamp_label=create_slider('Preamp')
 | 
			
		||||
        add_slider(self.preamp_slider,self.preamp_label,0)
 | 
			
		||||
        for i,hz in enumerate(self.filter_state.frequencies):
 | 
			
		||||
            slider,label=create_slider(self.hz2label(hz))
 | 
			
		||||
            self.slider[i]=slider
 | 
			
		||||
            #slider.setStyleSheet('font-size: 7pt; font-family: monospace;'+outline%('red',))
 | 
			
		||||
            self.label[i]=label
 | 
			
		||||
            c=i+1
 | 
			
		||||
            add_slider(slider,label,i+1)
 | 
			
		||||
    def hz2label(self, hz):
 | 
			
		||||
        if hz==0:
 | 
			
		||||
            label_text='DC'
 | 
			
		||||
        elif hz==self.filter_state.sample_rate//2:
 | 
			
		||||
            label_text='Coda'
 | 
			
		||||
        else:
 | 
			
		||||
            label_text=hz2str(hz)
 | 
			
		||||
        return label_text
 | 
			
		||||
 | 
			
		||||
    def connect_signals(self):
 | 
			
		||||
        def connect(writer,reader,slider,label):
 | 
			
		||||
            slider.valueChanged.connect(writer)
 | 
			
		||||
            self.filter_state.readFilter.connect(reader)
 | 
			
		||||
            label_cb=partial(slider.setValue,0)
 | 
			
		||||
            label.clicked.connect(label_cb)
 | 
			
		||||
            return label_cb
 | 
			
		||||
 | 
			
		||||
        self.preamp_writer_cb=self.write_preamp
 | 
			
		||||
        self.preamp_reader_cb=self.sync_preamp
 | 
			
		||||
        self.preamp_label_cb=connect(self.preamp_writer_cb,
 | 
			
		||||
                self.preamp_reader_cb,
 | 
			
		||||
                self.preamp_slider,
 | 
			
		||||
                self.preamp_label)
 | 
			
		||||
        self.writer_callbacks=[None]*len(self.slider)
 | 
			
		||||
        self.reader_callbacks=[None]*len(self.slider)
 | 
			
		||||
        self.label_callbacks=[None]*len(self.label)
 | 
			
		||||
        for i in range(len(self.slider)):
 | 
			
		||||
            self.writer_callbacks[i]=partial(self.write_coefficient,i)
 | 
			
		||||
            self.reader_callbacks[i]=partial(self.sync_coefficient,i)
 | 
			
		||||
            self.label_callbacks[i]=connect(self.writer_callbacks[i],
 | 
			
		||||
                    self.reader_callbacks[i],
 | 
			
		||||
                    self.slider[i],
 | 
			
		||||
                    self.label[i])
 | 
			
		||||
    def disconnect_signals(self):
 | 
			
		||||
        def disconnect(writer,reader,label_cb,slider,label):
 | 
			
		||||
            slider.valueChanged.disconnect(writer)
 | 
			
		||||
            self.filter_state.readFilter.disconnect(reader)
 | 
			
		||||
            label.clicked.disconnect(label_cb)
 | 
			
		||||
        disconnect(self.preamp_writer_cb, self.preamp_reader_cb,
 | 
			
		||||
                self.preamp_label_cb, self.preamp_slider, self.preamp_label)
 | 
			
		||||
        for i in range(len(self.slider)):
 | 
			
		||||
            disconnect(self.writer_callbacks[i],
 | 
			
		||||
                    self.reader_callbacks[i],
 | 
			
		||||
                    self.label_callbacks[i],
 | 
			
		||||
                    self.slider[i],
 | 
			
		||||
                    self.label[i])
 | 
			
		||||
 | 
			
		||||
    def write_preamp(self, v):
 | 
			
		||||
        self.filter_state.preamp=self.slider2coef(v)
 | 
			
		||||
        self.filter_state.seed()
 | 
			
		||||
    def sync_preamp(self):
 | 
			
		||||
        self.preamp_slider.blockSignals(True)
 | 
			
		||||
        self.preamp_slider.setValue(self.coef2slider(self.filter_state.preamp))
 | 
			
		||||
        self.preamp_slider.blockSignals(False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def write_coefficient(self,i,v):
 | 
			
		||||
        self.filter_state.coefficients[i]=self.slider2coef(v)/math.sqrt(2.0)
 | 
			
		||||
        self.filter_state.seed()
 | 
			
		||||
    def sync_coefficient(self,i):
 | 
			
		||||
        slider=self.slider[i]
 | 
			
		||||
        slider.blockSignals(True)
 | 
			
		||||
        slider.setValue(self.coef2slider(math.sqrt(2.0)*self.filter_state.coefficients[i]))
 | 
			
		||||
        slider.blockSignals(False)
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def slider2coef(x):
 | 
			
		||||
        return (1.0+(x/1000.0))
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def coef2slider(x):
 | 
			
		||||
        return int((x-1.0)*1000)
 | 
			
		||||
outline='border-width: 1px; border-style: solid; border-color: %s;'
 | 
			
		||||
 | 
			
		||||
class SliderLabel(QtGui.QLabel):
 | 
			
		||||
    clicked=QtCore.pyqtSignal()
 | 
			
		||||
    def __init__(self,label_text,filter_state,parent=None):
 | 
			
		||||
        super(SliderLabel,self).__init__(parent)
 | 
			
		||||
        self.setStyleSheet('font-size: 7pt; font-family: monospace;')
 | 
			
		||||
        self.setText(label_text)
 | 
			
		||||
        self.setMinimumSize(self.sizeHint())
 | 
			
		||||
    def mouseDoubleClickEvent(self, event):
 | 
			
		||||
        self.clicked.emit()
 | 
			
		||||
        super(SliderLabel,self).mouseDoubleClickEvent(event)
 | 
			
		||||
 | 
			
		||||
#until there are server side state savings, do it in the client but try and avoid
 | 
			
		||||
#simulaneous broadcasting situations
 | 
			
		||||
class FilterState(QtCore.QObject):
 | 
			
		||||
    #DEFAULT_FREQUENCIES=map(float,[25,50,75,100,150,200,300,400,500,800,1e3,1.5e3,3e3,5e3,7e3,10e3,15e3,20e3])
 | 
			
		||||
    DEFAULT_FREQUENCIES=[31.75,63.5,125,250,500,1e3,2e3,4e3,8e3,16e3]
 | 
			
		||||
    readFilter=QtCore.pyqtSignal()
 | 
			
		||||
    def __init__(self,sink):
 | 
			
		||||
        super(FilterState,self).__init__()
 | 
			
		||||
        self.sink_props=dbus.Interface(sink,dbus_interface=prop_iface)
 | 
			
		||||
        self.sink=dbus.Interface(sink,dbus_interface=eq_iface)
 | 
			
		||||
        self.sample_rate=self.get_eq_attr('SampleRate')
 | 
			
		||||
        self.filter_rate=self.get_eq_attr('FilterSampleRate')
 | 
			
		||||
        self.channels=self.get_eq_attr('NChannels')
 | 
			
		||||
        self.channel=self.channels
 | 
			
		||||
        self.set_frequency_values(self.DEFAULT_FREQUENCIES)
 | 
			
		||||
        self.sync_timer=QtCore.QTimer()
 | 
			
		||||
        self.sync_timer.setSingleShot(True)
 | 
			
		||||
        self.sync_timer.timeout.connect(self.save_state)
 | 
			
		||||
 | 
			
		||||
    def get_eq_attr(self,attr):
 | 
			
		||||
        return self.sink_props.Get(eq_iface,attr)
 | 
			
		||||
    def freq_proper(self,xs):
 | 
			
		||||
        return [0]+xs+[self.sample_rate//2]
 | 
			
		||||
    def _set_frequency_values(self,freqs):
 | 
			
		||||
        self.frequencies=freqs
 | 
			
		||||
        #print 'base',self.frequencies
 | 
			
		||||
        self.filter_frequencies=map(lambda x: int(round(x)), \
 | 
			
		||||
                self.translate_rates(self.filter_rate,self.sample_rate,
 | 
			
		||||
                    self.frequencies) \
 | 
			
		||||
                )
 | 
			
		||||
        self.coefficients=[0.0]*len(self.frequencies)
 | 
			
		||||
        self.preamp=1.0
 | 
			
		||||
    def set_frequency_values(self,freqs):
 | 
			
		||||
        self._set_frequency_values(self.freq_proper(freqs))
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def translate_rates(dst,src,rates):
 | 
			
		||||
        return list(map(lambda x: x*dst/src,rates))
 | 
			
		||||
    def seed(self):
 | 
			
		||||
        self.sink.SeedFilter(self.channel,self.filter_frequencies,self.coefficients,self.preamp)
 | 
			
		||||
        self.sync_timer.start(SYNC_TIMEOUT)
 | 
			
		||||
    def readback(self):
 | 
			
		||||
        coefs,preamp=self.sink.FilterAtPoints(self.channel,self.filter_frequencies)
 | 
			
		||||
        self.coefficients=coefs
 | 
			
		||||
        self.preamp=preamp
 | 
			
		||||
        self.readFilter.emit()
 | 
			
		||||
    def set_filter(self,preamp,coefs):
 | 
			
		||||
        self.sink.SetFilter(self.channel,dbus.Array(coefs),self.preamp)
 | 
			
		||||
        self.sync_timer.start(SYNC_TIMEOUT)
 | 
			
		||||
    def save_state(self):
 | 
			
		||||
        print 'saving state'
 | 
			
		||||
        self.sink.SaveState()
 | 
			
		||||
    def load_profile(self,profile):
 | 
			
		||||
        self.sink.LoadProfile(self.channel,dbus.String(profile))
 | 
			
		||||
        self.sync_timer.start(SYNC_TIMEOUT)
 | 
			
		||||
    def flush_state(self):
 | 
			
		||||
        if self.sync_timer.isActive():
 | 
			
		||||
            self.sync_timer.stop()
 | 
			
		||||
            self.save_state()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def safe_log(k,b):
 | 
			
		||||
    i=0
 | 
			
		||||
    while k//b!=0:
 | 
			
		||||
        i+=1
 | 
			
		||||
        k=k//b
 | 
			
		||||
    return i
 | 
			
		||||
def hz2str(hz):
 | 
			
		||||
    p=safe_log(hz,10.0)
 | 
			
		||||
    if p<3:
 | 
			
		||||
        return '%dHz' %(hz,)
 | 
			
		||||
    elif hz%1000==0:
 | 
			
		||||
        return '%dKHz' %(hz/(10.0**3),)
 | 
			
		||||
    else:
 | 
			
		||||
        return '%.1fKHz' %(hz/(10.0**3),)
 | 
			
		||||
 | 
			
		||||
def subdivide(xs, t_points):
 | 
			
		||||
    while len(xs)<t_points:
 | 
			
		||||
        m=[0]*(2*len(xs)-1)
 | 
			
		||||
        m[0:len(m):2]=xs
 | 
			
		||||
        for i in range(1,len(m),2):
 | 
			
		||||
            m[i]=(m[i-1]+m[i+1])//2
 | 
			
		||||
        xs=m
 | 
			
		||||
    p_drop=len(xs)-t_points
 | 
			
		||||
    p_drop_left=p_drop//2
 | 
			
		||||
    p_drop_right=p_drop-p_drop_left
 | 
			
		||||
    #print 'xs',xs
 | 
			
		||||
    #print 'dropping %d, %d left, %d right' %(p_drop,p_drop_left,p_drop_right)
 | 
			
		||||
    c=len(xs)//2
 | 
			
		||||
    left=xs[0:p_drop_left*2:2]+xs[p_drop_left*2:c]
 | 
			
		||||
    right=list(reversed(xs[c:]))
 | 
			
		||||
    right=right[0:p_drop_right*2:2]+right[p_drop_right*2:]
 | 
			
		||||
    right=list(reversed(right))
 | 
			
		||||
    return left+right
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    dbus.mainloop.qt.DBusQtMainLoop(set_as_default=True)
 | 
			
		||||
    app=QtGui.QApplication(sys.argv)
 | 
			
		||||
    qpaeq_main=QPaeq()
 | 
			
		||||
    qpaeq_main.show()
 | 
			
		||||
    sys.exit(app.exec_())
 | 
			
		||||
 | 
			
		||||
if __name__=='__main__':
 | 
			
		||||
    main()
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue