Sven Teresniak07/21/2020, 9:48 AM
dict. The documentation is not more than a hello-world. I'd like to use Prefect's Context to pass some (configuration) constants to task but that's not possible.
#!/usr/bin/env python # coding: utf8 import prefect from prefect import task, Flow from prefect.environments.storage.local import Local @task def print_context(): prefect.context.get("logger").info("get-val is '%s'", prefect.context.get("val")) prefect.context.get("logger").info("dot-val is '%s'", prefect.context.val) with Flow("contexttest", storage=Local(directory="/flows/.prefect/flows")) as flow: with prefect.context(val="SPAM"): print_context() if __name__ == "__main__": flow.register()
and in the second line throws an exception. Thus, the context's
is only valid in the
block. But what's the purpose of
if not passing some simple constants around? I can also write
with the same result: not available in task.
with Flow("contexttest", storage=Local(directory="/flows/.prefect/flows")) as flow: prefect.context.val="SPAM" print_context()
Luis Muniz07/21/2020, 10:04 AM
@task def get_data(chunk_size): #fetch connection curr = connection.cursor() curr.execute(sql.SQL("select id from big_table")) collection = curr.fetch_many(chunk_size)) while (!collection.is_empty()): #spawn task(collection) <----- *Here spwan a task* collection = curr.fetch_many(chunk_size))
Ben Davison07/21/2020, 12:36 PM
Preston Marshall07/21/2020, 1:40 PM
Sven Teresniak07/21/2020, 3:52 PM
Florian L07/21/2020, 3:59 PM
Chris Goddard07/21/2020, 5:40 PM
Pedro Machado07/21/2020, 5:43 PM
More generally it would be great to be able to use context + parameters to define the flow run name.
Matt Wong-Kemp07/21/2020, 7:24 PM
but I'm getting an error:
async def do_thing_async_impl(a,b,c): await ... ... @task def do_thing(a,b,c): return asyncio.run_until.complete(do_thing_async(a,b,c))
Before I got digging into event loop fun, is there an easier way to do an async task?
Future <Future pending cb=[BaseSelectorEventLoop._sock_connect_done(9)()]> attached to a different loop')
karteekaddanki07/21/2020, 10:27 PM
and are invoked via Python. How can I use targets effectively in this case? I've tried to
but as expected, this causes my task to run always. I've worked around this issue by creating a target a layer of indirection by returning a path to a file that contains the filename of the actual file generated by my
program (similar to empty targets in make, the presence of this file is indicates that the task is run and the contents of this file point to the location of the task output). It would be nice if I can avoid this indirection and directly be able to return a
object that doesn't necessarily correspond to a serialized python object. All downstream processes that consume these results treat them as locations. In other words, I am looking for a behavior identical to Luigi. Thanks in advance.
Thomas La Piana07/22/2020, 5:29 AM
Adrien Boutreau07/22/2020, 8:56 AM
Matias Godoy07/22/2020, 9:54 AM
Iain Dillingham07/22/2020, 10:09 AM
environment variables, so I'm using defaults.
Lance Haig07/22/2020, 10:20 AM
Marwan Sarieddine07/22/2020, 1:28 PM
Sven Teresniak07/22/2020, 2:00 PM
Richard Hughes07/22/2020, 2:48 PM
Matt Wong-Kemp07/22/2020, 3:12 PM
Michael C. Grant07/22/2020, 3:49 PM
Shawn Marhanka07/22/2020, 9:59 PM
, but I cannot find how to get the flow_id without registering. Is there a way to get all of the flow mappings (name + id) from a cloud project and then use that when creating flow runs. Or am I going about this all wrong? Thanks for the help.
James Bennett Saxon07/23/2020, 1:58 AM
So I didn't want to get into debugging this because my code could be totally wrong. I was hoping to find some examples of using this and other Prefect Tasks. Are there examples for how to use this and other tasks?
ERROR:prefect.TaskRunner:Unexpected error: AttributeError('__enter__',)
Sven Teresniak07/23/2020, 7:59 AM
def complex_task_generating_function(singleelement): case(sometask, foo): anothertask(…) … with Flow("foo") as flow: param = Parameter("param", required=False) # generates a list of strings, based on param. len is 0…n elements_to_process = maybe_generate_work_items(param) # when this evaluates to False, all the following is skipped, the apply_map as well! case(isempty_task(elements_to_process), True): # now I either want to add one default element or # somehow do the processing based on the following result generated_default = default_value_generator_task() # maybe so? elements_to_process = task(lambda x: [x])(generated_default) # now the tricky part. # elements_to_process is either a list or just one (runtime dependent) default value result = apply_map(complex_task_generating_function, elements_to_process)
does not know
. I cannot just use
is not a task (its the beef of the flow so to say and in fact the logic of the flow). I found a workaround by doing something like this:
But to write code like the hack-task that basically checks if the flow ran through the isempty-case or not seems odd. I don't want to "check" whether or not the flow used one path or another. The run path through the flow should decide this. How can I write this elegant and easy? Sorry for the long question but I want to learn how to use Prefect properly because in the future I'm going to write a lot of flows.
@task(name="hack", skip_on_upstream_skip=False) def merger_hack(elements, default): return elements or [default] with Flow("foo") as flow: param = Parameter("param", required=False) # same as above elements_to_process = maybe_generate_work_items(param) # same as above case(isempty_task(elements_to_process), True): generated_default = default_value_generator_task() final_list = merger_hack(elements_to_process, generated_default) result = apply_map(complex_task_generating_function, final_list)
bruno.corucho07/23/2020, 9:32 AM
Should I return these partitions and do the delaying and computing using a Prefect's map() from within the flow's scope definition? Thanks in advance! 🙂 And have a great weekend!
df = read_sql_table(table='peanuts', uri=connection, index_col="peanut_id", columns=["peanut_details, peanut_date"], npartitions=1000)
Sven Teresniak07/23/2020, 11:41 AM
-Task seems not to fit.
Thomas Hoeck07/23/2020, 12:16 PM
Klemen Strojan07/23/2020, 12:18 PM
Adam07/23/2020, 4:50 PM
Jason Carter07/23/2020, 7:22 PM
in that order but no flow in the UI.
prefect backend server, prefect server start and prefect agent start
import prefect from prefect import task, Flow @task def hello_task(): logger = prefect.context.get("logger") <http://logger.info|logger.info>("Hello, Cloud!") flow = Flow("hello-flow", tasks=[hello_task]) # flow.run() flow.register()
Ashish Arora07/23/2020, 11:08 PM