Christian Eik
11/29/2019, 11:49 AMCreateContainer
works fineChristian Eik
11/29/2019, 11:50 AMitay livni
11/29/2019, 6:31 PMifelse
the False
branch is a pass
. calced_def_df = calced_def_df.copy()
Is that the right way to implement this logic? I am getting a UserWarning: You are making a copy of a task that has dependencies on or to other tasks in the active flow context. The copy will not retain those dependencies.
"You are making a copy of a task that has dependencies on or to other tasks "
alvin goh
12/01/2019, 9:14 AMfrom prefect import Flow, Parameter
with Flow("math") as f:
x = Parameter("x")
d = x['d']
a = d + 1
flow_state = f.run(x={'d': 7})
print(flow_state) # success
# create new flow and copy old flow
new_flow = Flow(name = "Test")
new_flow.update(f)
flow_state2 = new_flow.run(x={'d': 7})
print(flow_state2) # fails
Stacktrace:
ERROR - prefect.TaskRunner | Unexpected error: TypeError("run() missing 1 required positional argument: 'key'",)
Traceback (most recent call last):
File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 869, in get_task_run_state
self.task.run, timeout=self.task.timeout, **raw_inputs
File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/utilities/executors.py", line 85, in timeout_handler
return fn(*args, **kwargs)
TypeError: run() missing 1 required positional argument: 'key'
David Ojeda
12/02/2019, 11:52 AM.run
method is called with the same instance but different input values. In other words, if I change a member variable inside .run
, this change is available to the next run
call.
Perhaps a minimal example can explain the situation a bit better:
from prefect import task, Task, Flow
@task
def generate_numbers():
return list(range(10))
class MyTask(Task):
def __init__(self, *, value=None, **kwargs):
super().__init__(**kwargs)
self.value = None
def run(self, *, number):
print(f'Hello I am {hex(id(self))} number is {number}')
if self.value is None:
self.value = number
else:
print(f'What? Who set this value={self.value}?')
return number + 1
instance = MyTask()
with Flow("My First Flow") as flow:
n = generate_numbers()
n1 = instance.map(number=n)
flow.run()
This outputs:
[2019-12-02 11:46:43,162] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
[2019-12-02 11:46:43,162] INFO - prefect.FlowRunner | Starting flow run.
[2019-12-02 11:46:43,166] INFO - prefect.TaskRunner | Task 'generate_numbers': Starting task run...
[2019-12-02 11:46:43,166] INFO - prefect.TaskRunner | Task 'generate_numbers': finished task run for task with final state: 'Success'
[2019-12-02 11:46:43,167] INFO - prefect.TaskRunner | Task 'MyTask': Starting task run...
[2019-12-02 11:46:43,168] INFO - prefect.TaskRunner | Task 'MyTask[8]': Starting task run...
Hello I am 0x11ddaef50 number is 8
[2019-12-02 11:46:43,169] INFO - prefect.TaskRunner | Task 'MyTask[8]': finished task run for task with final state: 'Success'
[2019-12-02 11:46:43,169] INFO - prefect.TaskRunner | Task 'MyTask[1]': Starting task run...
Hello I am 0x11ddaef50 number is 1
What? Who set this value=8?
...
I assume that this is specific to the local executor.
In general, I don’t think this is a problem, but I was wondering if there is any documentation / warning somewhere in the docs that I may have missed ?Luke Orland
12/02/2019, 4:15 PMPtitPoulpe
12/02/2019, 4:29 PMPtitPoulpe
12/02/2019, 4:30 PMitay livni
12/02/2019, 8:23 PMflow_state.result[ifelse_return].result
alvin goh
12/03/2019, 6:50 AMfrom prefect import Flow, Parameter, task
from prefect.tasks.core.constants import Constant
from prefect.engine.signals import FAIL
@task
def trythis(x, fail=False):
if fail:
raise FAIL()
return [k+1 for k in x]
@task
def tryprint(x):
print(x)
with Flow("math") as f:
x = Parameter("x")
y = trythis(x, fail=True)
y1 = trythis(y, fail=False)
z = tryprint.map(y1)
flow_state = f.run(x=[1,2,3,4,5,6,7])
print(flow_state.result[z].result)
Output:
[2019-12-03 06:50:35,756] INFO - prefect.FlowRunner | Beginning Flow run for 'math'
[2019-12-03 06:50:35,758] INFO - prefect.FlowRunner | Starting flow run.
[2019-12-03 06:50:35,765] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2019-12-03 06:50:35,769] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
[2019-12-03 06:50:35,775] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
[2019-12-03 06:50:35,780] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'Failed'
[2019-12-03 06:50:35,786] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
[2019-12-03 06:50:35,789] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'TriggerFailed'
[2019-12-03 06:50:35,795] INFO - prefect.TaskRunner | Task 'tryprint': Starting task run...
[2019-12-03 06:50:35,796] ERROR - prefect.TaskRunner | Task 'tryprint': unexpected error while running task: TypeError("'TRIGGERFAIL' object does not support indexing",)
Traceback (most recent call last):
File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 259, in run
executor=executor,
File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 719, in run_mapped_task
upstream_state.result[i],
TypeError: 'TRIGGERFAIL' object does not support indexing
[2019-12-03 06:50:35,800] INFO - prefect.TaskRunner | Task 'tryprint': finished task run for task with final state: 'Failed'
[2019-12-03 06:50:35,802] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
'TRIGGERFAIL' object does not support indexing
Seems like the mapped task should also raise triggerfailed if the upstream tasks failed.. maybe it's not so simple as it may just be a subset of upstream tasks which failed... what is the design philosophy on this?Dan Egan
12/03/2019, 8:31 PMitay livni
12/04/2019, 1:58 PMflow
with mapping
The results of the map
are read into another function munge_dfs
That edge
is not defined in the visualization -- It is represented as some id. Is this something prefect
or I have control over? It makes reading the visualization confusing. And even more so with multiple maps
in a flow
itay livni
12/04/2019, 1:59 PMalexandre kempf
12/04/2019, 4:13 PM@task
def return_2_values(x, y):
return x, y
with Flow("lolilol") as flow:
a, b = return_2_values(x=2, y=4)
But I can't because task are not iterable 😕
The only trick I found is to use a dictionary inside my task and then use GetItem. Unfortunaly, with this method, I need to know that is inside "return_values" at the level of the Flow in order to get the correct keys. Is there a way to do it without prior information on the task ? Like a normal python function 🙂Alex Post
12/04/2019, 6:18 PMwith Flow('Kafka Data Quality Test') as flow:
full_run = Parameter('full_run', default=False)
flow.run(parameters=dict(full_run=True))
and I keep getting this error:
Traceback (most recent call last):
File "src/app.py", line 243, in <module>
flow.run(parameters=dict(full_run=True))
File "/Users/apost/ccde/ccde-kafka-data-quality/venv/lib/python3.7/site-packages/prefect/core/flow.py", line 991, in run
fmt_params
ValueError: Flow.run received the following unexpected parameters: full_run
Has anyone seen this before?Dylan
LOOP
signals in your Flows, would you be up to sharing what you use them for? We’re considering a change to the behavior and we’re interested in your thoughts /threadNat Busa
12/05/2019, 4:54 AMARun
12/05/2019, 6:42 PM[2019-12-05 18:38:18,226] INFO - prefect.Flow: sf -> sftp | Waiting for next scheduled run at 2019-12-05T21:00:00+00:00
Luke Orland
12/06/2019, 4:51 PMrun()
method of tasks in the workflow.Kevin Hill
12/06/2019, 6:05 PMLOOP
API?Nat Busa
12/06/2019, 6:08 PMKevin Hill
12/06/2019, 8:25 PMKevin Hill
12/06/2019, 8:26 PMKevin Hill
12/06/2019, 8:44 PMKevin Hill
12/06/2019, 8:45 PMalvin goh
12/07/2019, 3:25 AMalvin goh
12/08/2019, 7:40 AMDiffyBron
12/08/2019, 1:52 PMDiffyBron
12/09/2019, 5:18 AMfrom prefect import Flow, task
from prefect.tasks.shell import ShellTask
my_task = ShellTask(helper_script="cd /etc")
@task
def print_output(output):
print(output)
with Flow("My shell") as flow:
result = my_task(command='ls -lat')
print_output(result)
if __name__ == '__main__':
out = flow.run()
Nat Busa
12/09/2019, 7:01 AM