Blake List
08/24/2021, 11:33 PMimport time
import prefect
from prefect import task, Flow
from prefect.tasks.redis import RedisSet, RedisGet
from prefect.tasks.control_flow import ifelse
redis_get = RedisGet(...)
redis_set = RedisSet(...)
with Flow("flow_p") as flow:
t1_get = redis_get(redis_key='t1_key')
# true case
t1_init = redis_set(redis_key='t1_key', redis_val="0000-00-00 00:00:00")
# false case
t1_set = redis_set(redis_key='t1_key', redis_val=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
ifelse(case(t1_get, None), t1_init, t1_set)
flow.run()
Running once then doing
RedisGet(...).run(redis_key='t1_key')
gives
b'0000-00-00 00:00:00'
but running again gives the same value.
Any ideas why I am not getting the False case on the second run?Kevin Kho
case
might have a different usage. Can you try replacing case(t1_get, None)
to task(lambda x: (x == None))(t1_get)
?Blake List
08/25/2021, 12:43 AMBlake List
08/25/2021, 2:00 AMwith Flow("flow_p") as flow:
t1_get = redis_get(redis_key='t1_key')
t2_get = redis_get(redis_key='t2_key')
t1_init = redis_set(redis_key='t1_key', redis_val="0000-00-00 00:00:00")
t1_set = redis_set(redis_key='t1_key', redis_val=t2_get)
ifelse(task(lambda x: (x == None))(t1_get), t1_init, t1_set)
redis_set(redis_key='t2_key', redis_val=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
I need
redis_set(redis_key='t2_key', redis_val=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
to wait for
ifelse(task(lambda x: (x == None))(t1_get), t1_init, t1_set)
to run.Kevin Kho
Blake List
08/25/2021, 4:35 AMwith Flow("flow_p") as flow:
t1_get = redis_get(redis_key='t1_key')
t2_get = redis_get(redis_key='t2_key')
t1_init = redis_set(redis_key='t1_key', redis_val="0000-00-00 00:00:00")
t1_set = redis_set(redis_key='t1_key', redis_val=t2_get)
cond = ifelse(task(lambda x: (x == None))(t1_get), t1_init, t1_set)
redis_set(redis_key='t2_key', redis_val=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), upstream_tasks=[cond])
###############################
print(RedisGet(host=myhost).run(redis_key='t1_key'))
print(RedisGet(host=myhost).run(redis_key='t2_key'))
flow.run()
print(RedisGet(host=myhost).run(redis_key='t1_key'))
print(RedisGet(host=myhost).run(redis_key='t2_key'))
flow.run()
print(RedisGet(host=myhost).run(redis_key='t1_key'))
print(RedisGet(host=myhost).run(redis_key='t2_key'))
With output:
None
None
[2021-08-25 16:34:13+1200] INFO - prefect.FlowRunner | Beginning Flow run for 'flow_p'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'as_bool': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'as_bool': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Finished task run for task with final state: 'Skipped'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Skipped'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
b'0000-00-00 00:00:00'
b'2021-08-25 16:31:41'
[2021-08-25 16:34:13+1200] INFO - prefect.FlowRunner | Beginning Flow run for 'flow_p'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'as_bool': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'as_bool': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Finished task run for task with final state: 'Skipped'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Skipped'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 16:34:13+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 16:34:13+1200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
b'2021-08-25 16:31:41'
b'2021-08-25 16:31:41'
Kevin Kho
CompareValue
is False in your graph, it raises a SKIP
which makes the downstream task SKIP
as well. I think you need to do ResetSet(…, skip_on_upstream_skip=False)
when you initialize it.Blake List
08/25/2021, 5:31 AMskip_on_upstream_skip=False
to RedisSet yields:
None
None
[2021-08-25 17:29:44+1200] INFO - prefect.FlowRunner | Beginning Flow run for 'flow_p'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'as_bool': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'as_bool': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Finished task run for task with final state: 'Skipped'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 17:29:44+1200] ERROR - prefect.TaskRunner | Task 'RedisSet': Exception encountered during task execution!
Traceback (most recent call last):
File "/workspace/appscratch/miniconda/cflbwl_black/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
logger=self.logger,
File "/workspace/appscratch/miniconda/cflbwl_black/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/workspace/appscratch/miniconda/cflbwl_black/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 441, in method
return run_method(self, *args, **kwargs)
File "/workspace/appscratch/miniconda/cflbwl_black/lib/python3.7/site-packages/prefect/tasks/redis/redis_tasks.py", line 101, in run
raise ValueError("redis_key and redis_val must be provided")
ValueError: redis_key and redis_val must be provided
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Failed'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
b'0000-00-00 00:00:00'
b'2021-08-25 17:29:42'
[2021-08-25 17:29:44+1200] INFO - prefect.FlowRunner | Beginning Flow run for 'flow_p'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'as_bool': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'as_bool': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Finished task run for task with final state: 'Skipped'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-25 17:29:44+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-25 17:29:44+1200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
b'2021-08-25 17:29:42'
b'2021-08-25 17:29:42'
Kevin Kho
ifelse
and instead switching to case
?Kevin Kho
with Flow("flow_p") as flow:
t1_get = redis_get(redis_key='t1_key')
t2_get = redis_get(redis_key='t2_key')
t1_init = redis_set(redis_key='t1_key', redis_val="0000-00-00 00:00:00")
t1_set = redis_set(redis_key='t1_key', redis_val=t2_get)
cond = task(lambda x: (x == None))(t1_get)
with case(cond, true):
.....
with case(cond, false):
redis_set(redis_key='t2_key', redis_val=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), upstream_tasks=[cond])
Kevin Kho
Blake List
08/25/2021, 9:50 PMBlake List
08/25/2021, 10:33 PMimport time
import prefect
from prefect import task, Flow
from prefect.tasks.redis import RedisSet, RedisGet, RedisExecute
from prefect.tasks.control_flow import ifelse
redis_get = RedisGet(host=myhost) # password is set as an env variable.
redis_set = RedisSet(host=myhost) # notice that skip_on_upstream_skip=False is not used.
@task
def get_current_time():
logger = prefect.context.get("logger")
time_now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
<http://logger.info|logger.info>(f"Current time: {time_now}")
return time_now
with Flow("flow_p") as flow:
t1_get = redis_get(redis_key='t1_key')
t2_get = redis_get(redis_key='t2_key')
t1_init = redis_set(redis_key='t1_key', redis_val="0000-00-00 00:00:00")
t1_set = redis_set(redis_key='t1_key', redis_val=t2_get)
cond = ifelse(task(lambda x: (x == None))(t1_get), t1_init, t1_set)
# cond is passed as an upstream task to ensure it runs after. note that it is
# not needed for success if testing locally.
redis_set(redis_key='t2_key', redis_val=get_current_time, upstream_tasks=[cond])
print(RedisGet(host=myhost).run(redis_key='t1_key'))
print(RedisGet(host=myhost).run(redis_key='t2_key'))
flow.run()
print(RedisGet(host=myhost).run(redis_key='t1_key'))
print(RedisGet(host=myhost).run(redis_key='t2_key'))
time.sleep(10)
flow.run()
print(RedisGet(host=myhost).run(redis_key='t1_key'))
print(RedisGet(host=myhost).run(redis_key='t2_key'))
output:
None
None
[2021-08-26 10:29:15+1200] INFO - prefect.FlowRunner | Beginning Flow run for 'flow_p'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'as_bool': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'as_bool': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Finished task run for task with final state: 'Skipped'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'get_current_time': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.get_current_time | Current time: 2021-08-26 10:29:15
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'get_current_time': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Skipped'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-26 10:29:15+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:15+1200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
b'0000-00-00 00:00:00'
b'2021-08-26 10:29:15'
[2021-08-26 10:29:25+1200] INFO - prefect.FlowRunner | Beginning Flow run for 'flow_p'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisGet': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'as_bool': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'as_bool': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "True"': Finished task run for task with final state: 'Skipped'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'CompareValue: "False"': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'get_current_time': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.get_current_time | Current time: 2021-08-26 10:29:25
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'get_current_time': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Skipped'
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Starting task run...
[2021-08-26 10:29:25+1200] INFO - prefect.TaskRunner | Task 'RedisSet': Finished task run for task with final state: 'Success'
[2021-08-26 10:29:25+1200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
b'2021-08-26 10:29:15'
b'2021-08-26 10:29:25'
Thank you very much for all your help! 😁Kevin Kho