Rob Fowler
09/15/2020, 10:08 AMMarvin
09/15/2020, 10:08 AMRob Fowler
09/15/2020, 10:09 AMNuno Silva
09/15/2020, 10:13 AMDaskExecutor
with LocalEnvironment. Agent runs locally, Dask cluster is running in azure aks.
Randomly I get the error: OSError: Timed out trying to connect to '<tcp://dask-user-44479c3f-5.pipelines:8786>' after 3 s
.
I tried to increase the default timeout with dask.config.set({"distributed.comm.timeouts.connect": 20})
but apparently to no avail, 3s is kept.
Ideas how to increase the timeout? ThanksRob Fowler
09/15/2020, 10:28 AMRob Fowler
09/15/2020, 10:30 AMRob Fowler
09/15/2020, 10:33 AMJulian
09/15/2020, 11:51 AMfirst_of_the_week == first_of_the_month
? Is this possible, just using schedule and filters or do I need to
check this condition within the flow run and stop one of the schedule runs? It's crucial the flow run is not executed twice,
just because both conditions are metkursat.t
09/15/2020, 2:26 PMBen Fogelson
09/15/2020, 3:17 PMLocalResult
to save results from a flow running in a docker container to the host filesystem? Here’s a toy example of what I’m trying to do:
from prefect.environments.storage import Docker, Local
from prefect.engine.results import LocalResult
from prefect import Flow, task
@task
def foo():
return "foo"
with Flow('run_local', storage=Local(), result=LocalResult()) as run_local_flow:
foo()
run_local_flow.register("proj")
with Flow('run_docker', storage=Docker(), result=LocalResult()) as run_docker_flow:
foo()
run_docker_flow.register("proj")
When I kick off a run of run_local_flow
, the foo
task produces a result on my host filesystem (e.g. /Users/ben.fogelson/.prefect/results/prefect-result-2020-09-15t15-11-21-771486-00-00
) as expected. When I kick off a run of run_docker_flow
, the UI says that there is a result saved to an analogous location, but there isn’t anything on my host filesystem (I’m guessing it is getting saved to the container filesystem).
Being able to run a flow with docker storage but inspect the results locally would be super useful for development.Kyle Pierce
09/15/2020, 5:12 PMrequests.exceptions.HTTPError: 400 Client Error: Bad Request for url: http://{name}.<http://us-east-1.elb.amazonaws.com:4200/graphql|us-east-1.elb.amazonaws.com:4200/graphql>
Theres a env variable with my host name in the .env file.Samuel Koshy
09/15/2020, 5:54 PMMarvin
09/15/2020, 5:54 PMChandra Manginipalli
09/15/2020, 6:37 PMmutation($state: JSON!) {
set_flow_run_state(input: {flow_run_id: "83b4e6be-b1ac-416c-833c-236c23f5e1ca", version: 5, state: $state}) {
id
}
}
{
state: {
type: "Success"
message: "It worked!"
}
}
Chandra Manginipalli
09/15/2020, 6:38 PMkiran
09/15/2020, 6:52 PMMarvin
09/15/2020, 6:52 PMkevin
09/15/2020, 7:37 PMClient().create_flow_run()
what is the best way to inject some custom context?
Would it be something like this at a high level:
client = Client()
with context(run_id=some_run_id):
client.create_flow_run()
As a follow up, the use case for the above question is that we want to inject a custom run_id into the env variable PREFECT__LOGGING__LOG_ATTRIBUTES="['run_id']"
so that our logger can collect that attribute. Would this be the correct approach?Brett Naul
09/15/2020, 10:39 PMResources
The current default resource usage of a prefect-job has a request and limit for CPU ofand the agent limits itself to100m
for memory and128Mi
for CPU. Make sure your cluster has enough resources that it does not start to get clogged up with all of your flow runs. A more customizable Kubernetes environment is on the roadmap!100m
Vitaly Shulgin
09/15/2020, 10:40 PMVitaly Shulgin
09/15/2020, 10:40 PMAttempting to call `flow.register` during execution of flow file will lead to unexpected results.
Vitaly Shulgin
09/15/2020, 10:41 PMVitaly Shulgin
09/15/2020, 10:43 PMsark
09/16/2020, 3:01 AMCreateContainer
?sebastian.clanzett
09/16/2020, 6:30 AMJulian
09/16/2020, 6:45 AMKlemen Strojan
09/16/2020, 7:18 AM# schedule
daily_schedule = CronSchedule("0 7 * * *", start_date=pendulum.now(tz="Europe/Vienna"))
# result
RESULT = AzureResult('prefect', connection_string_secret='my_azure_secret')
with Flow("master_flow", schedule=daily_schedule, result=RESULT) as flow:
flow_1_param_1 = FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters={'param': '1'})
flow_1_param_2 = FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters={'param': '2'})
flow_1_param_3 = FlowRunTask(
flow_name="flow_1",
project_name="project_1",
wait=True,
parameters={'param': '3'})
flow_2 = FlowRunTask(
flow_name="flow_2",
project_name="project_1",
wait=True)
flow_3 = FlowRunTask(
flow_name="flow_3",
project_name="project_1",
wait=True)
flow_1_param_1.set_downstream(flow_1_param_2)
flow_1_param_2.set_downstream(flow_1_param_3)
flow_1_param_3.set_downstream(flow_2)
flow_2.set_downstream(flow_3)
###############################################################################
# Settings
flow.storage = GitHub(
repo="some_repo",
path="some_path",
secrets=["GITHUB_ACCESS_TOKEN"]
)
# Labels for identifying agents
flow.environment = LocalEnvironment(labels=["label_1", "label_2"])
We are using Cloud and have two agents with local Dask clusters. Labels from the master flow points towards one agent and labels from all other flows point towards the second agent. When we run the master flow, everything runs normally, no errors or warnings in the log. But in reality, only flow_1_param_1
, flow_2
, and flow_3
ran. What are we missing?Robin
09/16/2020, 8:22 AMDinu Gherman
09/16/2020, 11:59 AM