Source code for imagect.core.app
from imagect.api.app import IApp
from zope import interface
from PyQt5.QtWidgets import QApplication, QMessageBox
from PyQt5 import QtCore
from PyQt5.QtCore import QStandardPaths
from asyncqt import QEventLoop, QThreadExecutor
import asyncio
from rx.scheduler.mainloop import QtScheduler
[docs]@interface.implementer(IApp)
class App(QApplication):
"""
application
"""
def __init__(self, argv):
super().__init__(argv)
self.setApplicationName("imagect")
self.loop = QEventLoop(self)
asyncio.set_event_loop(self.loop)
self.scheduler = QtScheduler(QtCore)
[docs] def asyncio_loop(self):
return self.loop
[docs] def rx_scheduler(self):
return self.scheduler
[docs] def showMsg(self, title : str, msg : str):
"""
show message box
"""
QMessageBox.information(None, title, msg)
pass
[docs] def appDir(self) -> str :
return self.applicationDirPath()
[docs] def appDataDir(self) -> str :
return QStandardPaths.writableLocation(QStandardPaths.AppDataLocation)
if __name__ == "__main__" :
from PyQt5.QtWidgets import QProgressBar, QWidget, QLabel
import rx
from rx import operators as ops
from rx.subject import Subject
import time
import sys
class Window(QWidget):
def __init__(self):
QWidget.__init__(self)
self.setWindowTitle("Rx for Python rocks")
self.resize(600, 600)
self.setMouseTracking(True)
# This Subject is used to transmit mouse moves to labels
self.mousemove = Subject()
def mouseMoveEvent(self, event):
self.mousemove.on_next((event.x(), event.y()))
def showProgress(loop):
progress = QProgressBar()
progress.setRange(0, 99)
progress.show()
close = loop.create_future()
async def master():
await first_50()
with QThreadExecutor(1) as exec:
await loop.run_in_executor(exec, last_50)
# TODO announce completion?
await close
async def first_50():
for i in range(50):
progress.setValue(i)
await asyncio.sleep(.1)
def last_50():
for i in range(50, 100):
loop.call_soon_threadsafe(progress.setValue, i)
time.sleep(.1)
# asyncio.run_coroutine_threadsafe(master(), loop)
task = loop.create_task(master())
def cancel() :
close.set_result(0)
a = App([])
window = Window()
window.show()
showProgress(a.asyncio_loop())
text = 'TIME FLIES LIKE AN ARROW'
def on_next(info):
label, (x, y), i = info
label.move(x + i*12 + 15, y)
label.show()
def handle_label(label, i):
delayer = ops.delay(i * 0.100)
mapper = ops.map(lambda xy: (label, xy, i))
return window.mousemove.pipe(
delayer,
mapper,
)
labeler = ops.flat_map_indexed(handle_label)
mapper = ops.map(lambda c: QLabel(c, window))
rx.from_(text).pipe(
mapper,
labeler,
).subscribe(on_next, on_error=print, scheduler=a.rx_scheduler())
sys.exit(a.exec_())