Skip to content

Commit

Permalink
Merge pull request #17 from WwwwwyDev/develop
Browse files Browse the repository at this point in the history
version 0.1.2
  • Loading branch information
WwwwwyDev authored May 9, 2024
2 parents afc934c + 53d8662 commit cf63fb2
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 30 deletions.
2 changes: 1 addition & 1 deletion crawlipt/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
# 88YbdP88 8P 88""" dP__Yb Yb 88"Yb dP__Yb Yb "88 88""
# 88 YY 88 dP 88 dP""""Yb YboodP 88 Yb dP""""Yb YboodP 888888

VERSION = (0, 1, 1)
VERSION = (0, 1, 2)

__version__ = '.'.join(map(str, VERSION))
2 changes: 1 addition & 1 deletion crawlipt/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def inner_wrapper(*args, **kwargs):
if type_.annotation == type_.empty:
raise ParamTypeError(f"Parameter {name} must be indicated the type.")
if name not in all_kwargs:
raise ParamTypeError(f"Parameter {name} is not in the defined parameter list.")
raise ParamTypeError(f"Parameter {name} is in the defined parameter list, but missing.")
if all_kwargs[name] is None and type_.default is not type_.empty:
continue
if all_kwargs[name] == "__PRE_RETURN__":
Expand Down
80 changes: 59 additions & 21 deletions crawlipt/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def get_dict(obj):
class ScriptProcess:
ACTIONS = get_dict(Action)
CONDITIONS = get_dict(Condition)
__POP_KEY = {"method", "next", "if", "check", "condition", "loop", "return_flag", "while"}
__POP_KEY = {"method", "next", "if", "check", "condition", "loop", "return_flag", "while", "fail_script"}

@staticmethod
@check
Expand All @@ -78,13 +78,20 @@ def __condition_check(temp_condition: dict, name: str, pre_deep: str, current_de
if condition not in ScriptProcess.CONDITIONS.keys():
msg = "(Deep %s) Could not found the Condition Method in '%s'" % (pre_deep + str(current_deep), name)
raise ScriptSyntaxError(ParamTypeError(msg), condition, pre_deep + str(current_deep))
fail_script = temp_condition.get("fail_script")
if fail_script:
ScriptProcess.syntax_check(script=fail_script,
pre_deep=pre_deep + str(current_deep) + "->",
return_record=return_record)
return_flag = temp_condition.get("return_flag")
if return_flag and not isinstance(return_flag, str):
msg = "(Deep %s) return_flag must be the type of str" % (pre_deep + str(current_deep))
raise ScriptSyntaxError(ParamTypeError(msg), condition, pre_deep + str(current_deep))
for key, value in temp_condition.items():
if key.lower() not in ScriptProcess.__POP_KEY:
temp_args[key] = value
if "store" in signature(ScriptProcess.CONDITIONS[condition]).parameters:
temp_args["store"] = None
if return_record:
for key, value in temp_args.items():
if isinstance(value, str) and value.startswith("__rf-") and value.endswith("__"):
Expand Down Expand Up @@ -141,8 +148,6 @@ def syntax_check(script: dict | str, pre_deep: str = "", return_record: dict = {
ScriptProcess.syntax_check(script=loop_script,
pre_deep=pre_deep + str(current_deep) + "->",
return_record=return_record)
script = script.get("next")
continue
check_condition = script.get("check")
if check_condition:
ScriptProcess.__condition_check(temp_condition=check_condition,
Expand All @@ -152,6 +157,9 @@ def syntax_check(script: dict | str, pre_deep: str = "", return_record: dict = {
current_deep=current_deep)
if_condition = script.get("if")
if if_condition:
if not script.get("method"):
msg = "(Deep %s) The 'if' condition must be with a Action Method" % (pre_deep + str(current_deep))
raise ScriptSyntaxError(ParamTypeError(msg), "", pre_deep + str(current_deep))
ScriptProcess.__condition_check(temp_condition=if_condition,
name="if",
return_record=return_record,
Expand All @@ -160,8 +168,8 @@ def syntax_check(script: dict | str, pre_deep: str = "", return_record: dict = {
temp_args = {"driver": None}
method = script.get("method")
if not method:
msg = "(Deep %s) Method is missing" % (pre_deep + str(current_deep))
raise ScriptSyntaxError(ParamTypeError(msg), "", pre_deep + str(current_deep))
script = script.get("next")
continue
if method not in ScriptProcess.ACTIONS.keys():
msg = "(Deep %s) Could not found the Action Method" % (pre_deep + str(current_deep))
raise ScriptSyntaxError(ParamTypeError(msg), method, pre_deep + str(current_deep))
Expand Down Expand Up @@ -210,20 +218,32 @@ def syntax_check(script: dict | str, pre_deep: str = "", return_record: dict = {

@staticmethod
@check
def __process_condition(temp_condition: dict, webdriver: WebDriver, return_record: dict) -> bool:
def __process_condition(temp_condition: dict, webdriver: WebDriver, return_record: dict,
global_script: dict, interval: float, wait: float, store: StoreBase = None) -> bool:
condition = temp_condition.get("condition")
return_flag = temp_condition.get("return_flag")
temp_args = {"driver": webdriver}
for key, value in temp_condition.items():
if key.lower() not in ScriptProcess.__POP_KEY:
temp_args[key] = value
if "store" in signature(ScriptProcess.CONDITIONS[condition]).parameters:
temp_args["store"] = store
if return_record:
for key, value in temp_args.items():
if isinstance(value, str) and value.startswith("__rf-") and value.endswith("__"):
temp_args[key] = return_record[value[5:-2]]
is_success = ScriptProcess.CONDITIONS[condition](**temp_args)
if return_flag:
return_record[return_flag] = is_success
if not is_success:
fail_script = temp_condition.get("fail_script")
if fail_script:
ScriptProcess._process_script(script=fail_script,
global_script=global_script,
webdriver=webdriver,
interval=interval,
wait=wait,
store=store,)
return is_success

@staticmethod
Expand All @@ -244,7 +264,11 @@ def _process_script(script: dict, global_script: dict, webdriver: WebDriver, sto
if while_condition and cnt:
while ScriptProcess.__process_condition(temp_condition=while_condition,
return_record=return_record,
webdriver=webdriver) and cnt:
global_script=global_script,
webdriver=webdriver,
store=store,
interval=interval,
wait=wait) and cnt:
ScriptProcess._process_script(script=loop_script,
global_script=global_script,
webdriver=webdriver,
Expand All @@ -253,21 +277,21 @@ def _process_script(script: dict, global_script: dict, webdriver: WebDriver, sto
interval=interval,
wait=wait)
cnt -= 1
script = script.get("next")
continue
if while_condition:
while ScriptProcess.__process_condition(temp_condition=while_condition,
return_record=return_record,
webdriver=webdriver):
global_script=global_script,
webdriver=webdriver,
store=store,
interval=interval,
wait=wait):
ScriptProcess._process_script(script=loop_script,
global_script=global_script,
webdriver=webdriver,
return_record=return_record,
store=store,
interval=interval,
wait=wait)
script = script.get("next")
continue
if cnt:
for _ in range(cnt):
ScriptProcess._process_script(script=loop_script,
Expand All @@ -277,25 +301,34 @@ def _process_script(script: dict, global_script: dict, webdriver: WebDriver, sto
store=store,
interval=interval,
wait=wait)
script = script.get("next")
continue
check_condition = script.get("check")
if check_condition:
is_success = ScriptProcess.__process_condition(temp_condition=check_condition,
return_record=return_record,
webdriver=webdriver)
global_script=global_script,
webdriver=webdriver,
store=store,
interval=interval,
wait=wait)
if not is_success:
return
if_condition = script.get("if")
if if_condition:
is_success = ScriptProcess.__process_condition(temp_condition=if_condition,
return_record=return_record,
webdriver=webdriver)
global_script=global_script,
webdriver=webdriver,
store=store,
interval=interval,
wait=wait)
if not is_success:
script = script.get("next")
continue
temp_args = {"driver": webdriver}
method = script.get("method")
if not method:
script = script.get("next")
continue
for key, value in script.items():
if key.lower() not in ScriptProcess.__POP_KEY:
temp_args[key] = value
Expand Down Expand Up @@ -345,7 +378,7 @@ def _replace_variable(script: dict, variable: VariableBase) -> None:
raise VariableError(msg)
script[key] = variable.get(variable_name)
for key in script.keys():
if key in {"loop", "if", "check", "while", "script"}:
if key in {"loop", "if", "check", "while", "script", "fail_script"}:
ScriptProcess._replace_variable(script=script[key], variable=variable)
script = script.get("next")

Expand All @@ -362,10 +395,15 @@ def generate(scripts: list | dict | str) -> dict:
res = {}
temp = res
for i in range(len(scripts)):
if "loop" in scripts[i].keys():
loop_temp = scripts[i]["loop"]
if "script" in loop_temp:
loop_temp["script"] = ScriptProcess.generate(loop_temp["script"])
for k in scripts[i].keys():
if k == "loop":
loop_temp = scripts[i]["loop"]
if "script" in loop_temp:
loop_temp["script"] = ScriptProcess.generate(loop_temp["script"])
if k in {"if", "check", "while"}:
judge_temp = scripts[i][k]
if "fail_script" in judge_temp:
judge_temp["fail_script"] = ScriptProcess.generate(judge_temp["fail_script"])
temp.update(scripts[i])
if i != len(scripts) - 1:
while temp.get("next"):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
EMAIL = 'wwy20001014@foxmail.com'
AUTHOR = 'WwwwwyDev'
REQUIRES_PYTHON = '>=3.10.0'
VERSION = '0.1.1'
VERSION = '0.1.2'

# What packages are required for this module to be executed?
REQUIRED = [
Expand Down
11 changes: 5 additions & 6 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ def checkNum(driver: WebDriver, xpath: str) -> bool:
"method": "redirect",
"url": "https://www.bchrt.com/tools/click-counter/",
}, {
"method": "getAttribute",
"xpath": "//*[@id=\"count\"]",
"name": "value",
"loop": {
"while": {
"condition": "checkNum",
Expand All @@ -282,26 +285,22 @@ def checkNum(driver: WebDriver, xpath: str) -> bool:
}
]
}
}, {
"method": "getAttribute",
"xpath": "//*[@id=\"count\"]",
"name": "value"
}]
json_str = cpt.Script.generate_json(step)
res = cpt.Script(json_str)(webdriver)
print(res)
webdriver.quit()

def test_conditions(self):
webdriver = get_driver()
# webdriver = get_driver()
step = [{
"method": "redirect",
"url": "https://www.baidu.com/",
}, {
"method": "input",
"xpath": "//*[@id=\"kw\"]",
"text": "your search text",
"if": {
"check": {
"condition": "presence",
"xpath": "//*[@id=\"su\"]"
}
Expand Down

0 comments on commit cf63fb2

Please sign in to comment.