Thomas Hoeck
07/21/2020, 7:33 AMUnexpected error: ZeroDivisionError('division by zero')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 952, in get_task_run_state
self.task.run, timeout=self.task.timeout, **raw_inputs
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 182, in timeout_handler
return fn(*args, **kwargs)
File "<ipython-input-6-39643be0b44f>", line 9, in hello_task
ZeroDivisionError: division by zero
I know it is part of the Hybrid model that code isn't shared but is there a way to get a normal python traceback, so I can see the failed line?Sven Teresniak
07/21/2020, 8:22 AMstored_as_script
and path
keyword arguments removed? in every storage class? (Local
in my case)Sven Teresniak
07/21/2020, 9:48 AMcontext
dict. The documentation is not more than a hello-world. I'd like to use Prefect's Context to pass some (configuration) constants to task but that's not possible.
#!/usr/bin/env python
# coding: utf8
import prefect
from prefect import task, Flow
from prefect.environments.storage.local import Local
@task
def print_context():
prefect.context.get("logger").info("get-val is '%s'", prefect.context.get("val"))
prefect.context.get("logger").info("dot-val is '%s'", prefect.context.val)
with Flow("contexttest", storage=Local(directory="/flows/.prefect/flows")) as flow:
with prefect.context(val="SPAM"):
print_context()
if __name__ == "__main__":
flow.register()
Will print 'None'
and in the second line throws an exception. Thus, the context's val
is only valid in the with
block. But what's the purpose of Context
if not passing some simple constants around?
I can also write
with Flow("contexttest", storage=Local(directory="/flows/.prefect/flows")) as flow:
prefect.context.val="SPAM"
print_context()
with the same result: not available in task.Luis Muniz
07/21/2020, 10:04 AM@task
def get_data(chunk_size):
#fetch connection
curr = connection.cursor()
curr.execute(sql.SQL("select id from big_table"))
collection = curr.fetch_many(chunk_size))
while (!collection.is_empty()):
#spawn task(collection) <----- *Here spwan a task*
collection = curr.fetch_many(chunk_size))
Ben Davison
07/21/2020, 12:36 PMPreston Marshall
07/21/2020, 1:40 PMSven Teresniak
07/21/2020, 3:52 PMFlorian L
07/21/2020, 3:59 PMChris Goddard
07/21/2020, 5:40 PMPedro Machado
07/21/2020, 5:43 PMscheduled_start_time?
More generally it would be great to be able to use context + parameters to define the flow run name.Matt Wong-Kemp
07/21/2020, 7:24 PMasync def do_thing_async_impl(a,b,c):
await ...
...
@task
def do_thing(a,b,c):
return asyncio.run_until.complete(do_thing_async(a,b,c))
but I'm getting an error: Future <Future pending cb=[BaseSelectorEventLoop._sock_connect_done(9)()]> attached to a different loop')
Before I got digging into event loop fun, is there an easier way to do an async task?karteekaddanki
07/21/2020, 10:27 PMC++
and are invoked via Python. How can I use targets effectively in this case? I've tried to return None
but as expected, this causes my task to run always. I've worked around this issue by creating a target a layer of indirection by returning a path to a file that contains the filename of the actual file generated by my C++
program (similar to empty targets in make, the presence of this file is indicates that the task is run and the contents of this file point to the location of the task output).
It would be nice if I can avoid this indirection and directly be able to return a Result
object that doesn't necessarily correspond to a serialized python object. All downstream processes that consume these results treat them as locations. In other words, I am looking for a behavior identical to Luigi. Thanks in advance.Thomas La Piana
07/22/2020, 5:29 AMAdrien Boutreau
07/22/2020, 8:56 AMMatias Godoy
07/22/2020, 9:54 AMIain Dillingham
07/22/2020, 10:09 AMPREFECT__
environment variables, so I'm using defaults.Lance Haig
07/22/2020, 10:20 AMMarwan Sarieddine
07/22/2020, 1:28 PMSven Teresniak
07/22/2020, 2:00 PMRichard Hughes
07/22/2020, 2:48 PMMatt Wong-Kemp
07/22/2020, 3:12 PMMichael C. Grant
07/22/2020, 3:49 PMShawn Marhanka
07/22/2020, 9:59 PMclient.create_flow_run(flow_id, parameters=parameters)
, but I cannot find how to get the flow_id without registering. Is there a way to get all of the flow mappings (name + id) from a cloud project and then use that when creating flow runs. Or am I going about this all wrong? Thanks for the help.James Bennett Saxon
07/23/2020, 1:58 AMMySqlFetch.run()
. :
ERROR:prefect.TaskRunner:Unexpected error: AttributeError('__enter__',)
So I didn't want to get into debugging this because my code could be totally wrong. I was hoping to find some examples of using this and other Prefect Tasks. Are there examples for how to use this and other tasks?Sven Teresniak
07/23/2020, 7:59 AMdef complex_task_generating_function(singleelement):
case(sometask, foo):
anothertask(…)
…
with Flow("foo") as flow:
param = Parameter("param", required=False)
# generates a list of strings, based on param. len is 0…n
elements_to_process = maybe_generate_work_items(param)
# when this evaluates to False, all the following is skipped, the apply_map as well!
case(isempty_task(elements_to_process), True):
# now I either want to add one default element or
# somehow do the processing based on the following result
generated_default = default_value_generator_task()
# maybe so?
elements_to_process = task(lambda x: [x])(generated_default)
# now the tricky part.
# elements_to_process is either a list or just one (runtime dependent) default value
result = apply_map(complex_task_generating_function, elements_to_process)
Problem is: apply_map
does not know skip_on_upstream_skip
. I cannot just use map()
because complex_task_generating_function
is not a task (its the beef of the flow so to say and in fact the logic of the flow). I found a workaround by doing something like this:
@task(name="hack", skip_on_upstream_skip=False)
def merger_hack(elements, default):
return elements or [default]
with Flow("foo") as flow:
param = Parameter("param", required=False) # same as above
elements_to_process = maybe_generate_work_items(param) # same as above
case(isempty_task(elements_to_process), True):
generated_default = default_value_generator_task()
final_list = merger_hack(elements_to_process, generated_default)
result = apply_map(complex_task_generating_function, final_list)
But to write code like the hack-task that basically checks if the flow ran through the isempty-case or not seems odd. I don't want to "check" whether or not the flow used one path or another. The run path through the flow should decide this. How can I write this elegant and easy?
Sorry for the long question but I want to learn how to use Prefect properly because in the future I'm going to write a lot of flows.bruno.corucho
07/23/2020, 9:32 AMdf = read_sql_table(table='peanuts', uri=connection,
index_col="peanut_id", columns=["peanut_details, peanut_date"],
npartitions=1000)
Should I return these partitions and do the delaying and computing using a Prefect's map() from within the flow's scope definition?
Thanks in advance! 🙂 And have a great weekend!Sven Teresniak
07/23/2020, 11:41 AMcase
. The ifelse
-Task seems not to fit.Thomas Hoeck
07/23/2020, 12:16 PMKlemen Strojan
07/23/2020, 12:18 PMAdam
07/23/2020, 4:50 PMAdam
07/23/2020, 4:50 PMLaura Lorenz (she/her)
07/23/2020, 5:00 PMJoe Schmid
07/23/2020, 5:07 PMAdam
07/27/2020, 9:08 AMLaura Lorenz (she/her)
07/27/2020, 2:58 PMhttps://youtu.be/50S4RqeEVVo▾
Adam
07/27/2020, 3:21 PMLaura Lorenz (she/her)
07/27/2020, 10:59 PMAdam
07/28/2020, 1:34 PM