123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import sys,os,subprocess,time,traceback
- import random
- from PySide2 import QtCore
- from PySide2.QtWidgets import *
- from PySide2.QtGui import *
- class Worker(QtCore.QRunnable):
- """Worker thread for running background tasks."""
- def __init__(self, fn, *args, **kwargs):
- super(Worker, self).__init__()
- # Store constructor arguments (re-used for processing)
- self.fn = fn
- self.args = args
- self.kwargs = kwargs
- self.signals = WorkerSignals()
- self.kwargs['progress_callback'] = self.signals.progress
- @QtCore.Slot()
- def run(self):
- try:
- result = self.fn(
- *self.args, **self.kwargs,
- )
- except:
- traceback.print_exc()
- exctype, value = sys.exc_info()[:2]
- self.signals.error.emit((exctype, value, traceback.format_exc()))
- else:
- self.signals.result.emit(result)
- finally:
- self.signals.finished.emit()
- class WorkerSignals(QtCore.QObject):
- """
- Defines the signals available from a running worker thread.
- Supported signals are:
- finished
- No data
- error
- `tuple` (exctype, value, traceback.format_exc() )
- result
- `object` data returned from processing, anything
- """
- finished = QtCore.Signal()
- error = QtCore.Signal(tuple)
- result = QtCore.Signal(object)
- progress = QtCore.Signal(int)
- class App(QDialog):
- """GUI Application using PySide2 widgets"""
- def __init__(self):
- QDialog.__init__(self)
- self.setAttribute(QtCore.Qt.WA_DeleteOnClose)
- self.setGeometry(QtCore.QRect(200, 200, 500, 500))
- self.threadpool = QtCore.QThreadPool()
- layout = QVBoxLayout(self)
- self.setLayout(layout)
- self.startbutton = QPushButton('START')
- self.startbutton.clicked.connect(self.run)
- layout.addWidget(self.startbutton)
- self.stopbutton = QPushButton('STOP')
- self.stopbutton.clicked.connect(self.stop)
- layout.addWidget(self.stopbutton)
- self.progressbar = QProgressBar(self)
- self.progressbar.setRange(0,1)
- layout.addWidget(self.progressbar)
- self.info = QTextEdit(self)
- self.info.append('Hello')
- layout.addWidget(self.info)
- return
- def progress_fn(self, msg):
- """Update progress"""
- self.info.append(str(msg))
- return
- def run_threaded_process(self, process, progress_fn, on_complete):
- """Execute a function in the background with a worker"""
- worker = Worker(fn=process)
- self.threadpool.start(worker)
- worker.signals.finished.connect(on_complete)
- worker.signals.progress.connect(progress_fn)
- self.progressbar.setRange(0,0)
- return
- def run(self):
- """call process"""
- self.stopped = False
- self.run_threaded_process(self.test, self.progress_fn, self.completed)
- def stop(self):
- self.stopped=True
- return
- def completed(self):
- self.progressbar.setRange(0,1)
- return
- def test(self, progress_callback):
- """Do some process here"""
- total = 500
- for i in range(0,total):
- time.sleep(.2)
- x = random.randint(1,1e4)
- progress_callback.emit(x)
- if self.stopped == True:
- return
- app = QApplication([])
- xapp = App()
- xapp.show()
- sys.exit(app.exec_())
|