alvin goh
04/28/2020, 5:13 PMKevin Systrom
04/28/2020, 9:06 PMMatthew Maldonado
04/28/2020, 11:31 PMKevin Systrom
04/29/2020, 12:49 AMKevin Systrom
04/29/2020, 12:52 AMimport prefect
from prefect.engine import cache_validators
from prefect.engine import signals
from prefect import task, Flow
import pandas as pd
@task(cache_for=pd.Timedelta(minutes=2),
cache_validator=cache_validators.all_inputs)
def task_b(n):
print(f'Running task b: {n}')
with Flow('CacheTests') as flow:
task_b.map([1,2,3])
with prefect.context():
flow.run()
Kevin Systrom
04/29/2020, 12:52 AMSanjay Patel
04/29/2020, 1:08 AMsimids = [1, 2, 3]
with Flow('example') as flow:
x = task1.map(simids)
y = task2.map(x)
z = task3.map(x, y)
#start a DB session
write_session = create_db_session.map()
task_write_to_db.map(write_session, x , z)
#summarize task - after completion of all mapped results - this part executes too early before all mapped tasks are complete
read_session = create_db_session()
PostProcessResults()
Matthew Maldonado
04/29/2020, 2:20 AMalvin goh
04/29/2020, 3:59 AMAakarshi Goel
04/29/2020, 6:36 AMAakarshi Goel
04/29/2020, 6:39 AMDavid Ojeda
04/29/2020, 9:20 AM%
-interpolation, .format
and f-strings.
I personally avoid %
-interpolation; .format
is very popular but I almost always use f-strings.
The logging
module is the sole exception, in my opinion, where %
-interpolation should be used, because it uses lazy evaluation:
logger.debug('My object is %s', some_object)
In that example, the some_object.__str__
method will not be called if the logger level (or is it the handler?) is not low enough to process debug messages. This is different from
logger.debug('My object is %s'%my_object)
or
logger.debug('My object is {}'.format(my_object))
or
logger.debug(f'My object is {my_object}')
because of lazy evaluation: the string will be formatted, then sent to the logger.
In a Prefect server setting, where logs are sent to the server, each log record needs to be serializable. This is achieved with json.dumps
on the record
received by prefect.utilities.logging.CloudHandler.emit
This record
has a field args
with the args passed to the logger.log
call, which can have non-serializable objects.
What I have seen is that the server logs have many critical messages saying:
Failed to write log with error: TypeError: Object of type X is not JSON serializable
I can change my logging calls to do
logging.debug('My object is %s', str(my_object))
but I really feel like this is a workaround for a bug in CloudHandler
: perhaps the args
should not be sent to the server, or unserializable args objects could be redacted.
What do you guys think is the appropriate fix here?Thomas La Piana
04/29/2020, 12:48 PMSanjay Patel
04/29/2020, 3:03 PM'Last State Message
[10:44am]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'wf'",)
I should note that i did have this working a couple of weeks ago, but with some recent code changes / environment changes / dependencies perhaps, I can no longer reproduce a successful run after executing.
I can successfully get a simple example flow to run on Prefect Core Server so my problem here is specific to our code base I'm sure. But I'm not sure how to start diagnosing why it runs when I test it flow.run() and then has a problem with prefect core server with everything still running on my local host. I've also tried calling flow.serialize() and it successfully prints out a big dictionary but don't know exactly what I'm looking for.
An alternate option is to start trying to containerize with Docker - which I'll likely need to do to move off my host machine anyway (as directed by prefect developers). Therefore my second question would be, will manually creating this dockerized container approach still work locally as well? Still in initial functionality and testing phase so just trying to get some examples working with our code base.
Thanks so muchLukas
04/29/2020, 4:41 PMpd.DataFrame
objects. In the end I have one task "gathering" those dataFrame results to write them all into a Postgres DB (reason to have it all in one task is that I want either all results to be uploaded or none of them in case something fails). The function that is supposed to write these results into DB takes a dict
as argument with the key tablename
and the value dataFrame
. So in the function I loop over the dict and write the data frame into the corresponding table. Here is where something goes wrong: my keys and values get mixed up and the function ends up trying to write the dataframes into the wrong table which obviously causes errors.
My flow looks somewhat like this:
task_result_1 = task1()
task_result_2 = task2()
task_result_3 = task3()
upload_dfs(
{
"table1": task_result_1,
"table2": task_result_2,
"table3": task_result_3,
},
)
In the flow prefect automatically creates a List
and a Dict
Task. Is the confusion in my keys / values somehow related to the sorting of items in the List
class? https://github.com/PrefectHQ/prefect/blob/eb59918d98b15ba6e14c0a406cee885c0e44ea8b/src/prefect/tasks/core/collections.py#L73
Thanks a lot! 🙂Troy Sankey
04/29/2020, 5:45 PMdefault
one instead). Also, more fundamentally, the prefect code is hard-coded to use the following string for the container args: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/execution/k8s/job.py#L276
"python -c 'import prefect; prefect.Flow.load(prefect.context.flow_file_path).environment.run_flow()'"
but actually the job spec ends up using a different string!
$ kubectl --namespace=prefect get job prefect-job-e4b101a5 -o yaml | grep -A1 args
- args:
- prefect execute cloud-flow
I checked the version of prefect on my laptop (which i use to deploy the prefect flow) and the prefect agent image deployed to my cluster. It's all 0.10.x.Zach
04/29/2020, 10:39 PMifelse
flow conditional but it doesn't really make sense how I would use it when I have multiple "true_tasks" and they have output that feed into eachotherZach
04/29/2020, 10:41 PMifelse
flow conditional work when I only need the "if" and not the "else"Joe Schmid
04/30/2020, 12:26 AMstorage.build()
or flow.register(build=True)
I'm calling flow.save()
and flow.register(build=False)
in my Dockerfile and the registered flows look correct(Storage, Flow Locations, etc. look good in Cloud UI) but my flow runs fail with:
Unexpected error while running flow: KeyError('a60e3215-88c7-499c-b525-83ba87b817f2')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 241, in run
parameters=parameters,
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/flow_runner.py", line 185, in initialize_run
task = tasks[task_run.task_slug]
KeyError: 'a60e3215-88c7-499c-b525-83ba87b817f2'
(I do see this doc: https://docs.prefect.io/orchestration/recipes/multi_flow_storage.html and that's close but not quite what I'm looking for)Rubens Sôto
04/30/2020, 12:39 AMHendrik Petzer
04/30/2020, 7:48 AMKotra Pali
04/30/2020, 12:53 PMRename(source='cola', target='colb').
Transpose(source=[...]),
Map(colname="a", values={3:4, 5:7}),
Assign(condition='colb>2', target="cola", value=100)
...
We have about 100k of these per single processing and there are some dependencies between commands (e.g. you can see that cola
depends on values of colb
in the last Assign
command and we are able to parse these dependencies in advance).
What I am thinking about is to create `prefect.Task`s from these transformation funcs and create a graph. That means a DAG with like 100k nodes with some edges at least (but each of them very lightweight). Do you think it's a good fit/idea for prefect? Can it handle such huge DAGs, or is it rather design for smaller ones?Avi A
04/30/2020, 1:11 PMAvi A
04/30/2020, 2:21 PMJacques
04/30/2020, 2:51 PMAmit Singh
04/30/2020, 3:09 PMTypeError: 'GetItem' object is not iterable
Avi A
04/30/2020, 3:12 PMitay livni
04/30/2020, 4:31 PMMark McDonald
04/30/2020, 5:35 PMKevin Systrom
04/30/2020, 5:49 PM