Ryan Baker
04/28/2021, 8:15 PMprefect describe flows --name foo-bar
That I get output like this, which includes information about the storage and parameters, but contains no information about the run configuration (I’m using kubernetes)
{
"archived": false,
"created": "2021-04-20T19:01:08.156447+00:00",
"description": null,
"environment": null,
"name": "inference-alpha-poc",
"parameters": [
.......
],
"project": {
"name": "recursion-mlops-poc"
},
"storage": {
"__version__": "0.14.16",
"access_token_secret": "GITHUB_ACCESS_TOKEN",
"base_url": null,
"flows": {
"inference-alpha-poc": "prefect_cloud/inference_flow.py"
},
"path": "prefect_cloud/inference_flow.py",
"ref": "prefect-cloud",
"repo": "recursionpharma/inference-alpha",
"secrets": [],
"type": "GitHub"
},
"version": 9
}
Is this expected, or a bug? Why isn’t the run configuration shown in this output?Nathan Atkins
04/28/2021, 8:43 PMupstream_tasks=[task1]
to task2. My gut told me this probably wasn't going to do what I wanted, but it was worth a try. My gut was right as task2 only process the number of elements in list2 that are in list1.
import prefect
from prefect import Flow, task
@task
def mapped(value: int) -> int:
prefect.context.get("logger").info(f"{value}")
return value
if __name__ == "__main__":
with Flow("mapped_upstream") as flow:
list1 = [1, 2]
task1 = mapped.map(value=list1)
list2 = [10, 20, 30]
task2 = mapped.map(value=list2, upstream_tasks=[task1])
flow.run()
What is the correct way to get mapped task2 to have an upstream dependency on task1?
[2021-04-28 143831-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped_upstream'
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run...
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped'
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run...
[2021-04-28 143831-0600] INFO - prefect.mapped[0] | 1
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success'
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run...
[2021-04-28 143831-0600] INFO - prefect.mapped[1] | 2
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success'
[2021-04-28 143831-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run...
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped'
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run...
[2021-04-28 143832-0600] INFO - prefect.mapped[0] | 10
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success'
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run...
[2021-04-28 143832-0600] INFO - prefect.mapped[1] | 20
[2021-04-28 143832-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success'
[2021-04-28 143832-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeededEnda Peng
04/28/2021, 8:51 PMwith Flow("Demo") as flow:
for_date = Parameter('for_date', required=True, default='20200101')
for_date_str = dt.datetime.strptime(for_date, "%Y%m%d")
flavienbwk
04/28/2021, 11:46 PMjars
04/28/2021, 11:54 PMkevin
04/29/2021, 2:02 AMtask(
fn=lambda a, b: a+b,
name='some_task'
)(some_a, some_b)
Mahesh
04/29/2021, 7:21 AMRomain
04/29/2021, 9:21 AMfrom prefect import Flow, task, flatten, apply_map
@task
def A():
return [1, 2, 3]
@task
def B(x):
return list(range(x))
@task
def C(y):
return y + 100
def foo(y):
return C(y)
with Flow('flat map') as f:
a = A() # [1, 2, 3]
b = B.map(x=a) # [[0], [0, 1], [0, 1, 2]]
c = apply_map(foo, y=flatten(b)) # [100, 100, 101, 100, 101, 102]
While running it, I get : "TypeError: can only concatenate list (not "int") to list"
Am I missing something?Johan Lundin
04/29/2021, 9:30 AMSumit Kumar Rai
04/29/2021, 10:11 AMTalha
04/29/2021, 11:06 AMNikola Lusic
04/29/2021, 12:53 PMFailed to load and execute Flow's environment: StorageError("An error occurred while unpickling the flow:\n TypeError('code() takes at most 15 arguments (16 given)')\n This may be due to one of the following version mismatches between the flow build and execution environments: \n - python: (flow build with '3.8.5', currently running with '3.7.10')")
I'm running prefect server, prefect agent and registration environment on 3 different Ubuntu machines, all running:
$ python3
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> import prefect
>>> prefect.__version__
'0.14.17'
>>> sys.version_info
sys.version_info(major=3, minor=8, micro=5, releaselevel='final', serial=0)
I don't understand where does the agent pick up the python 3.7.10 version, and why is the task executed with that environment (the python 3.7 is not even installed on the server nor agent environment).
Is there a way to set the python version on the execution environment, or am I missing something?Sean Talia
04/29/2021, 1:35 PMRobin Cole
04/29/2021, 3:40 PMRequestResponse
. I also log the response from the lambdas to a db, currently at the end of the batch job. My question is whether I am better to stick with running batch job from Prefect, or if I am better to use Prefect to call the 1000 lambdas in parallel using lambda Event
mode, then use Prefects ability to handle retries to manage the show..? Also a related question is when is the best time to log to the db. Thanks in advance! ps loving prefect way more than airflowDanielle Dalton
04/29/2021, 5:56 PMapi_data_to_s3
that sends json data to S3. In other flows, we import this class and pass it two arguments, the vendor name and the api_endpoint (ex. api_data_to_s3(vendor="google", api_endpoint="keywords")
. The issue is that some vendors have multiple endpoints and when we try to parameterize the api_endpoint we get some errors. I'll post some code in the thread below but I'd love to know the recommended way to solve.Joe
04/29/2021, 6:09 PMkevin
04/29/2021, 6:25 PMd_task = task(return some_dict)
m_task = task(lambda x: do_map_stuff(x))
with Flow f as ('flow'):
some_data = d_task()
mapped_result = m_task.map(some_data.values())
Diogo Munaro
04/29/2021, 10:05 PMMatthew Blau
04/30/2021, 12:28 AMTrevor Kramer
04/30/2021, 1:28 AMTalha
04/30/2021, 11:47 AMemre
04/30/2021, 2:49 PMmerge
, and I find it kind of unexpected. Here goes:
Usually, when using something like ifelse
and merge
, the flow goes like:
skip_if_true -> actual_task_for_false -> merge
skip_if_false -> actual_task_for_true -> merge
This works as expected. However I tried to directly merge tasks that can or cannot SKIP
:
skippable_1 = do_i_skip(skip=True)
skippable_2 = do_i_skip(skip=False)
merge(skippable_1, skippable_2)
merge
results in the SKIP
signal raised in skippable_1, rather than skippable_2. That is unexpected, at least for me. Is this expected behavior? Can I somehow connect these explicit SKIP
ping tasks with a merge directly?Josselin Girault
04/30/2021, 3:10 PM@task
def log_scheduled_start_time():
"""Print scheduled_start_time in logger."""
logger = prefect.context.get("logger")
scheduled_start_time = prefect.context.get("scheduled_start_time")
flow_name = prefect.context.get("flow_name")
<http://logger.info|logger.info>(f"{flow_name}: {scheduled_start_time}")
with Flow("subflow") as subflow:
log_scheduled_start_time()
subflow.register(project_name="test_schedule", labels=["test"])
with Flow("mainflow") as mainflow:
log_scheduled_start_time()
StartFlowRun(
flow_name="subflow", project_name="test_schedule", wait=True,
scheduled_start_time=prefect.context.get("scheduled_start_time")
)
mainflow.register(project_name="test_schedule", labels=["test"])
Expectation: mainflow's scheduled_start_time
is passed down to subflow, both flows log the same time.
Reality: subflow starts without a scheduled_start_time
, defaults to now()
, logged times are different.
Use case: Subflow depends on scheduled_start_time
to query some database, the agent that was supposed to run mainflow is down, mainflow and subflow are run a day later, subflow's query is then incorrect.
Bonus question: I can't seem to find documentation on how to pass parameters from one flow to another/use a flow's results as parameters for another 🙇Milton Tavares Neto
04/30/2021, 3:14 PMFrederick Thomas
04/30/2021, 4:01 PMdef register__flow():
with Flow("FlowName", schedule=None, ) as flow:
file_name = Parameter(name='file_name',default=None)
params = get_params()
p = get_file_blob(
file_name=f"notebooks/{file_name}",
con_string=params["uploads_blob"],
container="uploads"
)
flow.add_edge(file_name,params)
flow.add_edge(params, p)
if len(sys.argv) > 1 and sys.argv[1] == "register":
flow.register(project_name="project_x")
elif len(sys.argv) > 1 and sys.argv[1] == "test" and len(sys.argv[2]) > 1:
flow.run(parameters={'file_name':sys.argv[2]})
else:
print("Please use a quoted string of the file name...")
if __name__ == "__main__":
register__flow()
ThanksSean Talia
04/30/2021, 4:03 PMPrefectSecret
to add a new item to the prefect.context
? I see that you can add to the global context by doing something like:
with prefect.context(key='abc'):
flow.run()
but it's not clear to me how to actually supply this value to the runtime context when my agents are deploying the flows and I'm not explicitly calling flow.run()
. For context, I'm reading through the documentation on adding Slack handlers and I see that all you need to do is set a SLACK_WEBHOOK_URL
secret in order for the buiilt-in slack_notifier
to be able to post to your channel.Matthew Blau
04/30/2021, 7:22 PMEnda Peng
04/30/2021, 7:45 PM@task
process_file(x):
with Flow(xxx):
[process_file(x) for x in file_names]
It works fine with LocalRun
◦ pro: Easy to set up, dask has the friendly api to control resource
◦ con: I have to set up the same running env for every worker added in this dask cluster.
• Option 2: Multiple flows + multiple agents: 1-on-1 mapping between flow and file
e.g I could create 10 docker agents running on 10 hosts. Then in a script I create and run 100 flows, each flow processes one file. Let prefect distribute the flow for me
◦ pro: computation module is shipped with docker image, it is set up free.
◦ con: Not sure whether prefect is supposed to do the work load distribution duty. Even if yes, it is hard to control the resource consumption
• Option 3: Single flow + K8s:
Build image for my computation module and register with K8S first. Within one flow, it create k8s tasks which request for creating 100 pods to process the file.
◦ pro:
▪︎ k8s could deal with the workload distribution. Adding nodes could be easy.
▪︎ Any agent would be file as long as it could talk to k8s api
◦ con: complexity of setting up k8s?
Appreciate for any thoughts and input here!Carter Kwon
04/30/2021, 8:08 PM20,000 * 0.0025 = $50
or 20,000 * 0.005 = $100
depending on the plan you're on?Belal Aboabdo
04/30/2021, 9:59 PMprefect build -p my_flow.py
which throws this usage error
Usage: prefect [OPTIONS] COMMAND [ARGS]...
Try 'prefect -h' for help.
Error: No such command 'build'.
Exited with code exit status 2