https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Rodrigo Neves

    09/15/2020, 9:50 AM
    @Marvin are you a robot?
  • m

    Marvin

    09/15/2020, 9:50 AM
    I’d give you advice, but you wouldn’t listen. No one ever does. Good luck in the contest @Rodrigo Neves.
  • r

    Rob Fowler

    09/15/2020, 10:08 AM
    @Marvin why won't you enter me?
  • m

    Marvin

    09/15/2020, 10:08 AM
    I’d give you advice, but you wouldn’t listen. No one ever does. Good luck in the contest @Rob Fowler.
  • r

    Rob Fowler

    09/15/2020, 10:09 AM
    replaced all the broken bits of ansible with my pywinrm fork, vault and prefect
    🚀 2
    🤩 1
  • n

    Nuno Silva

    09/15/2020, 10:13 AM
    Hi. I'm using
    DaskExecutor
    with LocalEnvironment. Agent runs locally, Dask cluster is running in azure aks. Randomly I get the error:
    OSError: Timed out trying to connect to '<tcp://dask-user-44479c3f-5.pipelines:8786>' after 3 s
    . I tried to increase the default timeout with
    dask.config.set({"distributed.comm.timeouts.connect": 20})
    but apparently to no avail, 3s is kept. Ideas how to increase the timeout? Thanks
  • r

    Rob Fowler

    09/15/2020, 10:28 AM
    can you set it with kwargs on your DaskExecutor initialisation?
    n
    j
    +1
    12 replies · 4 participants
  • r

    Rob Fowler

    09/15/2020, 10:30 AM
    timeout=XX ?
  • r

    Rob Fowler

    09/15/2020, 10:33 AM
    if that is not handled it also passes a dict with cluster_kwargs = {..}
  • r

    Rob Fowler

    09/15/2020, 10:34 AM
    https://github.com/PrefectHQ/prefect/blob/65155e9b066a092c22c202def31293e9dd580f5f/src/prefect/engine/executors/dask.py#L79
  • j

    Julian

    09/15/2020, 11:51 AM
    Hey all, how would you write a (complex) schedule to trigger a flow run on both the first of the week and the first of the month, but only once if
    first_of_the_week == first_of_the_month
    ? Is this possible, just using schedule and filters or do I need to check this condition within the flow run and stop one of the schedule runs? It's crucial the flow run is not executed twice, just because both conditions are met
    j
    n
    12 replies · 3 participants
  • k

    kursat.t

    09/15/2020, 2:26 PM
    Hi there, We are looking for azkaban alternative, evaluating airflow and prefect. At our daily batch computation flow, we have 800+jobs. I wonder that what is the max num of tasks that prefect can handle in single flow?
    a
    j
    5 replies · 3 participants
  • b

    Ben Fogelson

    09/15/2020, 3:17 PM
    Is it possible to use
    LocalResult
    to save results from a flow running in a docker container to the host filesystem? Here’s a toy example of what I’m trying to do:
    from prefect.environments.storage import Docker, Local
    from prefect.engine.results import LocalResult
    from prefect import Flow, task
    
    @task
    def foo():
        return "foo"
    
    with Flow('run_local', storage=Local(), result=LocalResult()) as run_local_flow:
        foo()
    run_local_flow.register("proj")
    
    with Flow('run_docker', storage=Docker(), result=LocalResult()) as run_docker_flow:
        foo()
    run_docker_flow.register("proj")
    When I kick off a run of
    run_local_flow
    , the
    foo
    task produces a result on my host filesystem (e.g.
    /Users/ben.fogelson/.prefect/results/prefect-result-2020-09-15t15-11-21-771486-00-00
    ) as expected. When I kick off a run of
    run_docker_flow
    , the UI says that there is a result saved to an analogous location, but there isn’t anything on my host filesystem (I’m guessing it is getting saved to the container filesystem). Being able to run a flow with docker storage but inspect the results locally would be super useful for development.
    j
    c
    +1
    4 replies · 4 participants
  • k

    Kyle Pierce

    09/15/2020, 5:12 PM
    Are there any documentation or blog articles on how to register a flow on docker? I have the container setup and running but I cant register a flow. Getting an error with the graphql url
    requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: http://{name}.<http://us-east-1.elb.amazonaws.com:4200/graphql|us-east-1.elb.amazonaws.com:4200/graphql>
    Theres a env variable with my host name in the .env file.
    j
    10 replies · 2 participants
  • s

    Samuel Koshy

    09/15/2020, 5:54 PM
    @Marvin
  • m

    Marvin

    09/15/2020, 5:54 PM
    It gives me a headache just trying to think down to your level. I'll still enter you in the contest, @Samuel Koshy.
    👍 1
  • c

    Chandra Manginipalli

    09/15/2020, 6:37 PM
    Anyone know how to pass GraphQL variables in Interactive API. I tried the following with no luck for state. Syntax Error: Expected Name, found \"{\
    mutation($state: JSON!) {
    set_flow_run_state(input: {flow_run_id: "83b4e6be-b1ac-416c-833c-236c23f5e1ca", version: 5, state: $state}) {
    id
    }
    }
    {
    state: {
    type: "Success"
    message: "It worked!"
    }
    }
    👀 1
    j
    3 replies · 2 participants
  • c

    Chandra Manginipalli

    09/15/2020, 6:38 PM
    Also, I was able to run a flow successfully. Happen to know how to issue pause/resume queries in GraphQL for a flow run?
    j
    10 replies · 2 participants
  • k

    kiran

    09/15/2020, 6:52 PM
    @Marvin
  • m

    Marvin

    09/15/2020, 6:52 PM
    Don't feel you have to take any notice of me, please. I'll just enter you in the contest @kiran.
  • k

    kevin

    09/15/2020, 7:37 PM
    When calling
    Client().create_flow_run()
    what is the best way to inject some custom context? Would it be something like this at a high level:
    client = Client()
      with context(run_id=some_run_id):
        client.create_flow_run()
    As a follow up, the use case for the above question is that we want to inject a custom run_id into the env variable
    PREFECT__LOGGING__LOG_ATTRIBUTES="['run_id']"
    so that our logger can collect that attribute. Would this be the correct approach?
    j
    4 replies · 2 participants
  • b

    Brett Naul

    09/15/2020, 10:39 PM
    doesn't feel like it's worth a github issue but this is already done, right? https://docs.prefect.io/orchestration/agents/kubernetes.html
    Resources
    The current default resource usage of a prefect-job has a request and limit for CPU of 
    100m
     and the agent limits itself to 
    128Mi
     for memory and 
    100m
     for CPU. Make sure your cluster has enough resources that it does not start to get clogged up with all of your flow runs. A more customizable Kubernetes environment is on the roadmap!
    c
    2 replies · 2 participants
  • v

    Vitaly Shulgin

    09/15/2020, 10:40 PM
    Hello team
  • v

    Vitaly Shulgin

    09/15/2020, 10:40 PM
    Attempting to call `flow.register` during execution of flow file will lead to unexpected results.
  • v

    Vitaly Shulgin

    09/15/2020, 10:41 PM
    I tried to generate, and register flow from within running flow, and got the above error message? Is it possible to register new flow from running flow?
  • v

    Vitaly Shulgin

    09/15/2020, 10:43 PM
    Or, what will be solution to implement such functionality?
    c
    3 replies · 2 participants
  • s

    sark

    09/16/2020, 3:01 AM
    hi guys from the docs i know how to pass a parameter to a new task i define myself, but how do i pass parameters to an existing task like
    CreateContainer
    ?
    n
    22 replies · 2 participants
  • s

    sebastian.clanzett

    09/16/2020, 6:30 AM
    Hi guys. Is it possible to run tasks from a flow on different specialised agents?
    n
    5 replies · 2 participants
  • j

    Julian

    09/16/2020, 6:45 AM
    Hey, I already mentioned my problem in the thread https://prefect-community.slack.com/archives/CL09KU1K7/p1600170685303800 Summarized, If I use a custom filter for my schedule and register the flow (with this schedule attached to it), it throws an error
  • k

    Klemen Strojan

    09/16/2020, 7:18 AM
    Hey all, We have a master flow that looks like this:
    # schedule
    daily_schedule = CronSchedule("0 7 * * *", start_date=pendulum.now(tz="Europe/Vienna"))
    
    # result
    RESULT = AzureResult('prefect', connection_string_secret='my_azure_secret')
    
    with Flow("master_flow", schedule=daily_schedule, result=RESULT) as flow:
        flow_1_param_1 = FlowRunTask(
            flow_name="flow_1",
            project_name="project_1",
            wait=True,
            parameters={'param': '1'})
        flow_1_param_2 = FlowRunTask(
            flow_name="flow_1",
            project_name="project_1",
            wait=True,
            parameters={'param': '2'})
        flow_1_param_3 = FlowRunTask(
            flow_name="flow_1",
            project_name="project_1",
            wait=True,
            parameters={'param': '3'})
        flow_2 = FlowRunTask(
            flow_name="flow_2",
            project_name="project_1",
            wait=True)
        flow_3 = FlowRunTask( 
            flow_name="flow_3",
            project_name="project_1",
            wait=True)
        flow_1_param_1.set_downstream(flow_1_param_2)
        flow_1_param_2.set_downstream(flow_1_param_3)
        flow_1_param_3.set_downstream(flow_2)
        flow_2.set_downstream(flow_3)
    
    ###############################################################################
    # Settings
    
    flow.storage = GitHub(
        repo="some_repo",
        path="some_path",
        secrets=["GITHUB_ACCESS_TOKEN"]
    )
    
    # Labels for identifying agents
    flow.environment = LocalEnvironment(labels=["label_1", "label_2"])
    We are using Cloud and have two agents with local Dask clusters. Labels from the master flow points towards one agent and labels from all other flows point towards the second agent. When we run the master flow, everything runs normally, no errors or warnings in the log. But in reality, only
    flow_1_param_1
    ,
    flow_2
    , and
    flow_3
    ran. What are we missing?
    n
    8 replies · 2 participants
Powered by Linen
Title
k

Klemen Strojan

09/16/2020, 7:18 AM
Hey all, We have a master flow that looks like this:
# schedule
daily_schedule = CronSchedule("0 7 * * *", start_date=pendulum.now(tz="Europe/Vienna"))

# result
RESULT = AzureResult('prefect', connection_string_secret='my_azure_secret')

with Flow("master_flow", schedule=daily_schedule, result=RESULT) as flow:
    flow_1_param_1 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '1'})
    flow_1_param_2 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '2'})
    flow_1_param_3 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '3'})
    flow_2 = FlowRunTask(
        flow_name="flow_2",
        project_name="project_1",
        wait=True)
    flow_3 = FlowRunTask( 
        flow_name="flow_3",
        project_name="project_1",
        wait=True)
    flow_1_param_1.set_downstream(flow_1_param_2)
    flow_1_param_2.set_downstream(flow_1_param_3)
    flow_1_param_3.set_downstream(flow_2)
    flow_2.set_downstream(flow_3)

###############################################################################
# Settings

flow.storage = GitHub(
    repo="some_repo",
    path="some_path",
    secrets=["GITHUB_ACCESS_TOKEN"]
)

# Labels for identifying agents
flow.environment = LocalEnvironment(labels=["label_1", "label_2"])
We are using Cloud and have two agents with local Dask clusters. Labels from the master flow points towards one agent and labels from all other flows point towards the second agent. When we run the master flow, everything runs normally, no errors or warnings in the log. But in reality, only
flow_1_param_1
,
flow_2
, and
flow_3
ran. What are we missing?
n

nicholas

09/16/2020, 7:34 AM
Hi @Klemen Strojan - first off: really cool flow! Exciting to see the evolution of a meta-type flow like this. To answer your question, I think this has to do with the
idempotency_key
on the
FlowRunTask
. I believe that's populated with the flow run id available in the context from which it's called, meaning the 3 tasks that create runs for
flow_1
will be passing the same
idempotency_key
. My suggestion would be to try defining that explicitly to something like
1
,
2
, and
3
appended to the flow run id
k

Klemen Strojan

09/16/2020, 9:05 AM
Thanks @nicholas! I am checking the source code (https://github.com/PrefectHQ/prefect/blob/2554a489172fb3e81f9c04188221322582bba078/src/prefect/tasks/prefect/flow_run.py#L11) and I am not sure how to change the flow run id. I tried
flow_1_param_1 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '1'})
flow_1_param_2 = FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters={'param': '2'},
        new_flow_context={'flow_run_id': str(flow_1_param_1) + '1'})
but this has the same behaviour as the previous setting. If I call
flow_run_id
from
context
, I’ll get the one for my master flow, which doesn’t matter in this case.
n

nicholas

09/16/2020, 5:16 PM
Try creating a wrapper task that can pass in the flow run context like this:
@task
flow_1(index: int, parameters: obj):
  return FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters=parameters,
        idempotency_key=f"{prefect.context.flow_run_id}__{index}"
  )
Which you can call from your flow context like this:
flow_1_param_1 = flow_1(
        index=1
        parameters={'param': '1'}
)

flow_1_param_2 = flow_1(
        index=2
        parameters={'param': '2'}
)

flow_1_param_3 = flow_1(
        index=3
        parameters={'param': '3'}
)
k

Klemen Strojan

09/17/2020, 7:16 AM
Creating a wrapper didn’t do the trick, I got
TypeError: __init__() got an unexpected keyword argument 'idempotency_key'
straight away. I don’t see any other obvious way to pass a value to
idempotency_key
.
@task
def flow_1(index: int, parameters: dict):
  return FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters=parameters,
        idempotency_key=f"{prefect.context.flow_run_id}__{index}"
  )

flow_1_param_1 = flow_1(
        index=1,
        parameters={'param': '1'}
    )

flow_1_param_2 = flow_1(
        index=2,
        parameters={'param': '2'}
n

nicholas

09/17/2020, 2:06 PM
Ah sorry @Klemen Strojan - I wasn't looking closely enough at the code, you'll want to pass
idempotency_key
to the run method, not the base class:
def flow_1(index: int, parameters: dict):
  return FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters=parameters
  )(idempotency_key=f"{prefect.context.flow_run_id}__{index}")
👍 1
k

Klemen Strojan

09/18/2020, 8:07 AM
Thanks @nicholas, that did the trick (with a small modification pasted below). I appreciate your help! 🙏 More meta-type flows to come 🧐🙂
def flow_1(index: int, parameters: dict):
  return FlowRunTask(
        flow_name="flow_1",
        project_name="project_1",
        wait=True,
        parameters=parameters
  ).run(idempotency_key=f"{prefect.context.flow_run_id}__{index}")
n

nicholas

09/18/2020, 4:16 PM
Awesome!! Glad you got it working @Klemen Strojan 😄
View count: 1