Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

made the cli script a little faster for rollershutter controlling #34

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions pyduofern/duofern_stick.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,17 +438,23 @@ class DuofernStickThreaded(DuofernStick, threading.Thread):
def __init__(self, serial_port=None, *args, **kwargs):
super().__init__(*args, **kwargs)

if serial_port is None:

self.port = None
if serial_port is not None:
self.port = serial_port
if not self.ephemeral:
self.config['port'] = serial_port
self._dump_config()
elif 'port' in self.config and not self.ephemeral: # pragma: no cover
self.port = self.config['port']
else:
try:
self.port = serial.tools.list_ports.comports()[0].device
except IndexError:
raise DuofernException(
"No serial port configured and unable to autodetect device. Did you plug in your stick?")
logger.debug("no serial port set, autodetected {} for duofern".format(self.port))
else:
self.port = serial_port

# DuofernStick.__init__(self, device, system_code, config_file_json, duofern_parser)
#print("used port:", self.port)
self.serial_connection = serial.Serial(self.port, baudrate=115200, timeout=1)
self.running = False

Expand Down
227 changes: 15 additions & 212 deletions scripts/duofern_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@

from pyduofern.duofern_stick import DuofernStickThreaded


parser = argparse.ArgumentParser(epilog="use at your own risk")

parser.add_argument('--configfile', help='location of system config file', default=None, metavar="CONFIGFILE")
parser.add_argument('--configfile', help='location of system config file (if omitted the default config file will be used)', default=None, metavar="CONFIGFILE")

parser.add_argument('--code', help='set 4-digit hex code for the system (warning, always use same code for once-paired '
'devices) best chose something on the first run and write it down.',
default=None, metavar="SYSTEMCODE")

parser.add_argument('--device',
help='path to serial port created by duofern stick, defaults to first found serial port, typically '
'something like /dev/ttyUSB0 or /dev/duofernstick if you use the provided udev rules file',
'something like /dev/ttyUSB0 or /dev/serial/by-id/... or /dev/duofernstick if you use the provided udev rules file',
default=None)

parser.add_argument('--pair', action='store_true',
Expand All @@ -45,6 +46,10 @@
'CONFIGFILE',
default=False)

parser.add_argument('--show_paired', action='store_true',
help='Show known and configured Duofern devices',
default=False)

parser.add_argument('--unpair', action='store_true',
help='Stick will wait for pairing for {} seconds. Afterwards look in the config file for the paired '
'device name or call this program again to list the newly found device. If devices that were '
Expand Down Expand Up @@ -84,192 +89,10 @@
default=None, type=str)


def splitargs(func):
def wrapper(*args, **kwargs):
func(args[0], args[1].split(" "))

return wrapper


def ids_for_names(func):
def wrapper(*args, **kwargs):
id_dict = {device['id']: device['name'] for device in args[0].stick.config['devices']
if device['name'] in args[1]}
func(args[0], id_dict)

return wrapper


class DuofernCLI(Cmd):
def __init__(self, serial_port=None, system_code=None, config_file=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stick = DuofernStickThreaded(serial_port=args.device, system_code=args.code,
config_file_json=args.configfile)
self.stick._initialize()
self.stick.start()
self.prompt = "duofern> "

def emptyline(self):
pass

def do_pair(self, args):
"""
Usage:
pair <TIMEOUT>

Start pairing mode. Pass a timeout in seconds as <TIMEOUT>.
Will return after the timeout if no devices start pairing within the given timeout.

Example:
duofern> pair 10

"""
timeout = 10
if len(args) != 0:
try:
timeout = int(args[0])
except:
print("Please use an integer number to indicate TIMEOUT in seconds")
print("Starting pairing mode... waiting {} seconds".format(int(timeout)))
self.stick.pair(timeout=timeout)
time.sleep(args.pairtime + 0.5)
self.stick.sync_devices()
print("Pairing done, Config file updated.")

def do_unpair(self, args):
"""
Usage:
unpair <TIMEOUT>

Start pairing mode. Pass a timeout in seconds as <TIMEOUT>.
Will return after the timeout if no devices start pairing within the given timeout.

Example:
duofern> unpair 10

"""
timeout = 10
if len(args) != 0:
try:
timeout = int(args[0])
except:
print("Please use an integer number to indicate TIMEOUT in seconds")
print("Starting pairing mode... waiting {} seconds".format(int(timeout)))
self.stick.unpair(timeout=timeout)
time.sleep(args.pairtime + 0.5)
self.stick.sync_devices()
print("Pairing done, Config file updated.")

def do_remote(self, args):
code = args[0][0:6]
timeout = int(args[1])
self.stick.remote(code, timeout)
time.sleep(args.pairtime + 0.5)
self.stick.sync_devices()
print("Pairing done, Config file updated.")

@splitargs
@ids_for_names
def do_up(self, blinds):
"""
Usage:
up <SHUTTER> [<SHUTTER> <SHUTTER>]

Lift one or several shutters. Accepts a list of shutter names sepatated by space.

Example:
duofern> up Livingroom
duofern> up Livingroom Kitchen
"""
for blind_id in blinds:
print("lifting {}".format(blinds[blind_id]))
self.stick.command(blind_id, "up")

@splitargs
@ids_for_names
def do_down(self, blinds):
"""
Usage:
up <SHUTTER> [<SHUTTER> <SHUTTER>...]

Lower one or several shutters. Accepts a list of shutter names sepatated by space.

Example:
duofern> up Livingroom
duofern> up Livingroom Kitchen
"""
for blind_id in blinds:
print("lowering {}".format(blinds[blind_id]))
self.stick.command(blind_id, "down")

@splitargs
def do_rename(self, args):
"""
Usage:
rename <NAME> <NEW_NAME>

Rename an actor. Write changes to config file when done.

Example:
duofern> rename 13f897 kitchen_west
"""
id = [device['id'] for device in self.stick.config['devices'] if device['name'] == args[0]]
if len(id)==0:
print("Please enter a valid device name for renaming.")
self.stick.set_name(id[0], args[1])
print("Set name for {} to {}".format(id[0], args[0]))

def refresh(self,args):
"""
Usage:
refresh

Refresh config file with current changes.

example:
duofern> refresh
"""
self.stick.sync_devices()

@splitargs
@ids_for_names
def do_on(self, blinds):
"""
Usage:
off <SWITCH1> [<SWITCH2> <SWITCH3>]

Switch on one or several switch actors. Accepts a list of actor names.

Example:
duofern> off Livingroom
duofern> off Livingroom Kitchen
"""
for blind_id in blinds:
print("lifting {}".format(blinds[blind_id]))
self.stick.command(blind_id, "up")

@splitargs
@ids_for_names
def do_off(self, blinds):
"""
Usage:
off <SWITCH1> [<SWITCH2> <SWITCH3>]

Switch off one or several switch actors. Accepts a list of actor names.

Example:
duofern> off Livingroom
duofern> off Livingroom Kitchen
"""
for blind_id in blinds:
print("lifting {}".format(blinds[blind_id]))
self.stick.command(blind_id, "up")


if __name__ == "__main__":
args = parser.parse_args()

# print(args.up)
if args.debug:
logging.basicConfig(format=' %(asctime)s: %(message)s', level=logging.DEBUG)
else:
Expand All @@ -291,10 +114,10 @@ def do_off(self, blinds):
stick.set_name(args.set_name[0], args.set_name[1])
stick.sync_devices()

print("\nThe following devices are configured:\n")
print("\n".join(
["id: {:6} name: {}".format(device['id'], device['name']) for device in stick.config['devices']]))
print("\n")
if args.show_paired:
print("\nThe following devices are configured:\n")
print("\n".join(["id: {:6} name: {}".format(device['id'], device['name']) for device in stick.config['devices']]))
print("\n")

if args.pair:
print("entering pairing mode")
Expand Down Expand Up @@ -333,41 +156,26 @@ def do_off(self, blinds):
if args.up:
stick._initialize()
stick.start()
time.sleep(1)
ids = [device['id'] for device in stick.config['devices'] if device['name'] in args.up]
for blind_id in ids:
stick.command(blind_id, "up")
time.sleep(0.5)
stick.command(blind_id, "up")
time.sleep(0.5)
stick.command(blind_id, "up")
time.sleep(2)
time.sleep(1)

if args.down:
stick._initialize()
stick.start()
time.sleep(1)
ids = [device['id'] for device in stick.config['devices'] if device['name'] in args.down]
for blind_id in ids:
stick.command(blind_id, "down")
time.sleep(0.5)
stick.command(blind_id, "down")
time.sleep(0.5)
stick.command(blind_id, "down")
time.sleep(2)
time.sleep(1)

if args.stop:
stick._initialize()
stick.start()
time.sleep(1)
ids = [device['id'] for device in stick.config['devices'] if device['name'] in args.stop]
for blind_id in ids:
stick.command(blind_id, "stop")
time.sleep(0.5)
stick.command(blind_id, "stop")
time.sleep(0.5)
stick.command(blind_id, "stop")
time.sleep(2)
time.sleep(1)

if args.on:
stick._initialize()
Expand Down Expand Up @@ -414,15 +222,10 @@ def do_off(self, blinds):
if args.position is not None:
stick._initialize()
stick.start()
time.sleep(1)
ids = [device['id'] for device in stick.config['devices'] if device['name'] in args.position[1:]]
for blind_id in ids:
stick.command(blind_id, "position", 100 - int(args.position[0]))
time.sleep(0.5)
stick.command(blind_id, "position", 100 - int(args.position[0]))
time.sleep(0.5)
stick.command(blind_id, "position", 100 - int(args.position[0]))
time.sleep(2)
time.sleep(1)

stick.stop()
time.sleep(1)
Expand Down
Loading