Salohy
04/28/2021, 1:24 PMimport prefect
from prefect import task, Flow
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
STORAGE = Docker(registry_url="<http://prefect.azurecr.io|prefect.azurecr.io>", image_name="prefect/test")
RUN_CONFIG = KubernetesRun(image="<http://prefect.azurecr.io/prefect/test|prefect.azurecr.io/prefect/test>")
EXECUTOR = DaskExecutor(address="<tcp://my-ip:8786>")
@task
def hello():
logger = prefect.context.get('logger')
<http://logger.info|logger.info>("Hello!")
with Flow("changeme", storage=STORAGE, run_config = RUN_CONFIG, executor = EXECUTOR) as flow:
hello()
When I run this flow, the flow is successfully submitted but never got executed and the status is forever pending. Do I miss something? Do I need to specify flow.run()
in my code? Many thanks already for helping me 🙂🙏Ben Collier
04/28/2021, 3:58 PMVikram Thirumalai
04/28/2021, 4:36 PMCaused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002C254411C10>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
Jacob Wilson
04/28/2021, 5:11 PMRyan Baker
04/28/2021, 8:15 PMprefect describe flows --name foo-bar
That I get output like this, which includes information about the storage and parameters, but contains no information about the run configuration (I’m using kubernetes)
{
"archived": false,
"created": "2021-04-20T19:01:08.156447+00:00",
"description": null,
"environment": null,
"name": "inference-alpha-poc",
"parameters": [
.......
],
"project": {
"name": "recursion-mlops-poc"
},
"storage": {
"__version__": "0.14.16",
"access_token_secret": "GITHUB_ACCESS_TOKEN",
"base_url": null,
"flows": {
"inference-alpha-poc": "prefect_cloud/inference_flow.py"
},
"path": "prefect_cloud/inference_flow.py",
"ref": "prefect-cloud",
"repo": "recursionpharma/inference-alpha",
"secrets": [],
"type": "GitHub"
},
"version": 9
}
Is this expected, or a bug? Why isn’t the run configuration shown in this output?Nathan Atkins
04/28/2021, 8:43 PMupstream_tasks=[task1]
to task2. My gut told me this probably wasn't going to do what I wanted, but it was worth a try. My gut was right as task2 only process the number of elements in list2 that are in list1.
import prefect
from prefect import Flow, task
@task
def mapped(value: int) -> int:
prefect.context.get("logger").info(f"{value}")
return value
if __name__ == "__main__":
with Flow("mapped_upstream") as flow:
list1 = [1, 2]
task1 = mapped.map(value=list1)
list2 = [10, 20, 30]
task2 = mapped.map(value=list2, upstream_tasks=[task1])
flow.run()
What is the correct way to get mapped task2 to have an upstream dependency on task1?
[2021-04-28 14:38:31-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped_upstream'
[2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run...
[2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped'
[2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run...
[2021-04-28 14:38:31-0600] INFO - prefect.mapped[0] | 1
[2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success'
[2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run...
[2021-04-28 14:38:31-0600] INFO - prefect.mapped[1] | 2
[2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success'
[2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run...
[2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped'
[2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run...
[2021-04-28 14:38:32-0600] INFO - prefect.mapped[0] | 10
[2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success'
[2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run...
[2021-04-28 14:38:32-0600] INFO - prefect.mapped[1] | 20
[2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success'
[2021-04-28 14:38:32-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeededEnda Peng
04/28/2021, 8:51 PMwith Flow("Demo") as flow:
for_date = Parameter('for_date', required=True, default='20200101')
for_date_str = dt.datetime.strptime(for_date, "%Y%m%d")
flavienbwk
04/28/2021, 11:46 PMjars
04/28/2021, 11:54 PMkevin
04/29/2021, 2:02 AMtask(
fn=lambda a, b: a+b,
name='some_task'
)(some_a, some_b)
Mahesh
04/29/2021, 7:21 AMRomain
04/29/2021, 9:21 AMfrom prefect import Flow, task, flatten, apply_map
@task
def A():
return [1, 2, 3]
@task
def B(x):
return list(range(x))
@task
def C(y):
return y + 100
def foo(y):
return C(y)
with Flow('flat map') as f:
a = A() # [1, 2, 3]
b = B.map(x=a) # [[0], [0, 1], [0, 1, 2]]
c = apply_map(foo, y=flatten(b)) # [100, 100, 101, 100, 101, 102]
While running it, I get : "TypeError: can only concatenate list (not "int") to list"
Am I missing something?Johan Lundin
04/29/2021, 9:30 AMSumit Kumar Rai
04/29/2021, 10:11 AMTalha
04/29/2021, 11:06 AMNikola Lusic
04/29/2021, 12:53 PMFailed to load and execute Flow's environment: StorageError("An error occurred while unpickling the flow:\n TypeError('code() takes at most 15 arguments (16 given)')\n This may be due to one of the following version mismatches between the flow build and execution environments: \n - python: (flow build with '3.8.5', currently running with '3.7.10')")
I'm running prefect server, prefect agent and registration environment on 3 different Ubuntu machines, all running:
$ python3
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> import prefect
>>> prefect.__version__
'0.14.17'
>>> sys.version_info
sys.version_info(major=3, minor=8, micro=5, releaselevel='final', serial=0)
I don't understand where does the agent pick up the python 3.7.10 version, and why is the task executed with that environment (the python 3.7 is not even installed on the server nor agent environment).
Is there a way to set the python version on the execution environment, or am I missing something?Sean Talia
04/29/2021, 1:35 PMRobin Cole
04/29/2021, 3:40 PMRequestResponse
. I also log the response from the lambdas to a db, currently at the end of the batch job. My question is whether I am better to stick with running batch job from Prefect, or if I am better to use Prefect to call the 1000 lambdas in parallel using lambda Event
mode, then use Prefects ability to handle retries to manage the show..? Also a related question is when is the best time to log to the db. Thanks in advance! ps loving prefect way more than airflowDanielle Dalton
04/29/2021, 5:56 PMapi_data_to_s3
that sends json data to S3. In other flows, we import this class and pass it two arguments, the vendor name and the api_endpoint (ex. api_data_to_s3(vendor="google", api_endpoint="keywords")
. The issue is that some vendors have multiple endpoints and when we try to parameterize the api_endpoint we get some errors. I'll post some code in the thread below but I'd love to know the recommended way to solve.Joe
04/29/2021, 6:09 PMkevin
04/29/2021, 6:25 PMd_task = task(return some_dict)
m_task = task(lambda x: do_map_stuff(x))
with Flow f as ('flow'):
some_data = d_task()
mapped_result = m_task.map(some_data.values())
Diogo Munaro
04/29/2021, 10:05 PMMatthew Blau
04/30/2021, 12:28 AMTrevor Kramer
04/30/2021, 1:28 AMTalha
04/30/2021, 11:47 AMemre
04/30/2021, 2:49 PMmerge
, and I find it kind of unexpected. Here goes:
Usually, when using something like ifelse
and merge
, the flow goes like:
skip_if_true -> actual_task_for_false -> merge
skip_if_false -> actual_task_for_true -> merge
This works as expected. However I tried to directly merge tasks that can or cannot SKIP
:
skippable_1 = do_i_skip(skip=True)
skippable_2 = do_i_skip(skip=False)
merge(skippable_1, skippable_2)
merge
results in the SKIP
signal raised in skippable_1, rather than skippable_2. That is unexpected, at least for me. Is this expected behavior? Can I somehow connect these explicit SKIP
ping tasks with a merge directly?Josselin Girault
04/30/2021, 3:10 PM@task
def log_scheduled_start_time():
"""Print scheduled_start_time in logger."""
logger = prefect.context.get("logger")
scheduled_start_time = prefect.context.get("scheduled_start_time")
flow_name = prefect.context.get("flow_name")
<http://logger.info|logger.info>(f"{flow_name}: {scheduled_start_time}")
with Flow("subflow") as subflow:
log_scheduled_start_time()
subflow.register(project_name="test_schedule", labels=["test"])
with Flow("mainflow") as mainflow:
log_scheduled_start_time()
StartFlowRun(
flow_name="subflow", project_name="test_schedule", wait=True,
scheduled_start_time=prefect.context.get("scheduled_start_time")
)
mainflow.register(project_name="test_schedule", labels=["test"])
Expectation: mainflow's scheduled_start_time
is passed down to subflow, both flows log the same time.
Reality: subflow starts without a scheduled_start_time
, defaults to now()
, logged times are different.
Use case: Subflow depends on scheduled_start_time
to query some database, the agent that was supposed to run mainflow is down, mainflow and subflow are run a day later, subflow's query is then incorrect.
Bonus question: I can't seem to find documentation on how to pass parameters from one flow to another/use a flow's results as parameters for another 🙇Milton Tavares Neto
04/30/2021, 3:14 PMFrederick Thomas
04/30/2021, 4:01 PMdef register__flow():
with Flow("FlowName", schedule=None, ) as flow:
file_name = Parameter(name='file_name',default=None)
params = get_params()
p = get_file_blob(
file_name=f"notebooks/{file_name}",
con_string=params["uploads_blob"],
container="uploads"
)
flow.add_edge(file_name,params)
flow.add_edge(params, p)
if len(sys.argv) > 1 and sys.argv[1] == "register":
flow.register(project_name="project_x")
elif len(sys.argv) > 1 and sys.argv[1] == "test" and len(sys.argv[2]) > 1:
flow.run(parameters={'file_name':sys.argv[2]})
else:
print("Please use a quoted string of the file name...")
if __name__ == "__main__":
register__flow()
ThanksSean Talia
04/30/2021, 4:03 PMPrefectSecret
to add a new item to the prefect.context
? I see that you can add to the global context by doing something like:
with prefect.context(key='abc'):
flow.run()
but it's not clear to me how to actually supply this value to the runtime context when my agents are deploying the flows and I'm not explicitly calling flow.run()
. For context, I'm reading through the documentation on adding Slack handlers and I see that all you need to do is set a SLACK_WEBHOOK_URL
secret in order for the buiilt-in slack_notifier
to be able to post to your channel.Sean Talia
04/30/2021, 4:03 PMPrefectSecret
to add a new item to the prefect.context
? I see that you can add to the global context by doing something like:
with prefect.context(key='abc'):
flow.run()
but it's not clear to me how to actually supply this value to the runtime context when my agents are deploying the flows and I'm not explicitly calling flow.run()
. For context, I'm reading through the documentation on adding Slack handlers and I see that all you need to do is set a SLACK_WEBHOOK_URL
secret in order for the buiilt-in slack_notifier
to be able to post to your channel.Kevin Kho
04/30/2021, 4:09 PMSean Talia
04/30/2021, 4:18 PMPREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL
at runtime via a task?Kevin Kho
04/30/2021, 4:21 PMprefect local agent start --env var1
DockerRun(env={"SOME_VAR": "value"})
Sean Talia
04/30/2021, 4:24 PMvar1
stashed on the host where I'm running the agent but also 2) I'd like for different teams at my org to notify different channels as they need and I was hoping that they could make those adjustments in their flow code rather than my having to configure a pattern for agents/labels to determine thatKevin Kho
04/30/2021, 4:26 PMSean Talia
04/30/2021, 4:33 PMKevin Kho
04/30/2021, 4:35 PMslack_notifier
. Check slack_notifier
in the docs . It takes in a webhook_secret
Sean Talia
04/30/2021, 5:46 PMKevin Kho
04/30/2021, 6:09 PMSean Talia
04/30/2021, 6:34 PM