https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Riley Hun

    08/14/2020, 4:31 PM
    Hello friends - I just wanted to take the time to thank all of you for all the help these past few days. Was able to finish my first flow ingesting external data into our internal systems, and it works beautifully. Looking forward to sharing the framework with my colleagues!
    🚀 5
    ❤️ 6
  • b

    bral

    08/14/2020, 7:08 PM
    Hi again 😛arty-parrot: is there something as condition operator ? If previous task was succeeded continue , else interrupt. Do i need to use Result handler ?
    n
    • 2
    • 4
  • j

    Jeff Brainerd

    08/14/2020, 7:39 PM
    Hey 😛refect: team, a question about dask interaction. We are looking for best practices for interrupting and then restarting a flow. Our use case is that we need to sever all database connections during a database migration. We would like to avoid restarting the flow from the beginning, but we are fine with re-starting any running tasks from the beginning (in fact I don’t think we have a choice there). We are thinking of the following steps: • pause all running tasks using graphql API • scale down dask workers to 0 (we run in fargate) • scale up dask workers after the migration • unpause the tasks Would love to hear any thoughts. Thanks! 🙏
    👀 3
    c
    • 2
    • 3
  • j

    Jeremiah

    08/14/2020, 8:06 PM
    We’ve got about a third of the Prefect team and @Nate Atkins chatting over in the Core Cantina right now if anyone wants to join — @Jeff Brainerd that’s a great question, let’s discuss if you can make it! https://prefect-community.slack.com/archives/C0106HZ1CMS/p1597433468091100
    🚀 1
    👀 1
  • m

    Michael Reeves

    08/14/2020, 8:24 PM
    I have a quick question: how can I set a flow_run_id within the call of a
    FLOW_OBJ.run()
    ? I'm currently doing the following:
    with prefect.context(flow_run_id="asdf"):
                EMAIL.run(flow_run_id="asdfasdf", name="TEST")
    neither (context, nor the variable in the kwargs of the
    run
    function) actually change the flow_run_id when its output in the logger 😞 Do I need to wrap the flow_run_id in a state variable? If so whats the best way to do this? Finally, I'm running this flow over a dask cluster so idk if my issue here is affected by this issue: https://github.com/PrefectHQ/prefect/issues/2883
    n
    c
    • 3
    • 25
  • p

    Philip MacMenamin

    08/14/2020, 9:15 PM
    I created a project
    vtk_flow
    using the web GUI, and am attempting to register a flow:
    $ python3 -m vtk_flow.flow
    Traceback (most recent call last):
      File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
        "__main__", mod_spec)
      File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
        exec(code, run_globals)
      File "/nih-3d-resource-workflows/vtk_flow/flow.py", line 82, in <module>
        f.register(project_name='vtk_flow')
      File "/prefect-env/lib64/python3.7/site-packages/prefect/core/flow.py", line 1583, in register
        no_url=no_url,
      File "/prefect-env/lib64/python3.7/site-packages/prefect/client/client.py", line 729, in register
        project_name, project_name
    ValueError: Project vtk_flow not found. Run `client.create_project("vtk_flow")` to create it.
    I thought Client was to talk to Prefect Cloud ?
    n
    • 2
    • 15
  • h

    Hannah Amundson

    08/14/2020, 10:13 PM
    Hi! I need a specific task to run if a previous task was completed + it is a certain day of the week. Is there a way to use the schedule component for filters?
    👀 2
    n
    • 2
    • 2
  • k

    kevin

    08/14/2020, 10:17 PM
    Hey guys, is it "prefect legal" to inject context into
    prefect.context
    at runtime? More concretely, can I do something like this:
    with Flow('test') as flow:
        do_stuff()
    
    with prefect.context(foo='bar'):
        flow.run()
    and if not what would be the correct way to inject context after defining a flow?
    n
    • 2
    • 4
  • m

    Maxwell Dylla

    08/14/2020, 10:53 PM
    💃 Just arrived! Hello Prefect world
    :party-parrot: 1
    👋 5
    n
    • 2
    • 2
  • m

    Marwan Sarieddine

    08/15/2020, 4:02 AM
    Hi Folks, is there a way to force the restart of a task from within the UI ? some tasks are getting stuck indefinitely in a pending state and I am trying to force the flow to attempt to run them ...
    n
    • 2
    • 3
  • t

    tas

    08/15/2020, 10:34 AM
    Hi All, I am looking at setting up Prefect on GCP and I did go through the medium post regarding that. I am looking for a server less solution to run the flows that will not have the overhead of setting up k8s. Something like Fargate on AWS ?
    i
    n
    • 3
    • 4
  • a

    Alex Papanicolaou

    08/15/2020, 6:02 PM
    Hi, I have a quick question on cached tasks: will Prefect skip parts of the graph that are cached and don’t need to be computed so it can jump to only the tasks that do need computation?
    n
    • 2
    • 9
  • a

    Ame

    08/16/2020, 4:02 AM
    💃 Just arrived!
    👋 4
    :marvin: 3
    j
    • 2
    • 1
  • j

    John Ramirez

    08/16/2020, 7:25 PM
    Are you guys doing a cloud release cause I am unable to view my flow in Cloud
    c
    • 2
    • 3
  • a

    Ame

    08/17/2020, 3:27 AM
    Hello everyone, I got some questions regarding the Prefect Agent. Firstly, how should I choose the number of agents, is there a way to calculate or common way to decide this number? Secondly, is a single agent enough to handle (maximum) 1000 querys per hour, the server is running with a local environment and using a DaskExecutor(a local Dask cluster)
    c
    s
    • 3
    • 6
  • a

    Alfie

    08/17/2020, 5:28 AM
    Hello guys, I’d like to log some info in state handlers, such as flow version, flow run id, is that possible? How can I get them? thanks
    c
    • 2
    • 2
  • l

    Lewis Bails

    08/17/2020, 6:41 AM
    Hi all, Is it possible to control how many mapped tasks are running at any one time? I have a memory issue, as my mapped task requires loading in some data, and no matter how many workers or threads I specify in the DaskExecutor, the tasks just keep spawning.
    j
    d
    • 3
    • 7
  • s

    Simone Cittadini

    08/17/2020, 9:26 AM
    Hi all, question about latest server/ui ; I have prefect installed as a docker swarm, if pull the latest images ( tried "alpha" and "latest") the UI asks to create a tenant with command
    prefect server create-default-tenant --name default
    but from the shell ( inside the graphql container ) I got
    Error: No such command 'create-default-tenant'.
    prefect version
    0.13.1+14.g303356902
    EDIT: worked with master ( prefect v 0.13.2 ) great !
    🚀 3
  • a

    Adam

    08/17/2020, 10:31 AM
    Happy Monday everyone! Are there any best practices for sharing “clients” between tasks? By clients I mean database clients/connections, API clients etc. I would assume there’s some care to be taken here due to the distributed nature of the tasks.
    j
    • 2
    • 2
  • a

    Alex Papanicolaou

    08/17/2020, 4:52 PM
    Hi, we made a small adaptation to the task runner to make it run a little faster locally so we could use prefect as a slick local DAG executor. When running one of these flows within a regular flow, we get all the logs for both flows when we really don’t want the inner flow logs. How would we shut off logs for the inner flow? Was thinking something like wrapping the inner flow like this:
    with prefect.context(loglevel="ERROR"):
        inner_flow.run()
    k
    • 2
    • 8
  • m

    Mitchell Bregman

    08/17/2020, 7:23 PM
    Hi all, I am looking to create a dynamic task DAG, which essentially calls a database for all task definitions and dynamically generates tasks. Here is a code snippet:
    class DS242(ParkMobileFlow):
    
        @property
        def tasks(self):
            """Loads all tasks for data quality tests."""
    
            # Connect to ParkMobile client
            pm = ParkMobile()
    
            # Get Snowflake connection
            sf = pm.get("snowflake")
            sf.connect("sqlalchemy")
    
            # Call all tasks and task definitions
            records = sf.execute("""
                SELECT *
                FROM DEV_DB.DATA_QUALITY_TESTS.QUALITY_TASKS
                WHERE is_active = true;
            """)
    
            return records
    
        @property
        def flow(self):
            """Returns the Prefect Flow object."""
            return self.build()
    
        def build(self):
            """Builds the workflow."""
            tasks = []
            for task in self.tasks:
                tasks.append(
                    DataQualityTask(
                        uuid=task.task_uuid,
                        name=task.task_name,
                        description=task.task_description,
                        source_query=task.source_query,
                        target_query=task.target_query,
                    )
                )
    
            with Flow("DS242__data-quality-tests") as flow:
                for task in tasks:
                    task()
    
            return flow
    Now, I am trying to
    register
    this flow to the backend server, and am getting a serialization error, which is understandable. My question is, can I create a first
    Task
    , that queries the DB for all task definitions, and then dynamically creates new tasks to the
    Flow
    which performs all of the tasks? I can certainly build a custom Dockerfile that handles deployment of this, but would be awesome to connect to
    prefect server
    k
    • 2
    • 3
  • j

    Julien Allard

    08/17/2020, 8:46 PM
    Hello all! I have a question about scaling to Kubernetes+Dask in GCP. I am trying to run a flow that uses pandas for data transformation that requires a large amount of memory, more than what can be allocated for a single node of our current cluster configuration. Here are some of the ideas, I had to fix the problem 1. I could parallelize my flow using Dask Dataframe but that seems to be a lot of effort. 2. We could vertically scale the nodes, but then we would pay for more compute that we need most of the time 3. Create a preemtible node pool where the flow can then be executed I feel like the 3rd point could be a good solution, but I am not sure how it could work to allocate a preemtible node pool at the start of a flow and then execute the flow on the new node pool since we have already deployed the job or dask cluster on the other nodes. So I guess my question is what is the best way to achieve what i'm trying to do without having to vertically scale the cluster? Thank you!
    k
    s
    • 3
    • 6
  • b

    bral

    08/17/2020, 10:36 PM
    Good evening, I ran into the following problem, which I find it difficult to solve. My flow looks something like this: And here's an example: I am using localagent, and this flow will be executed sequentially for each file. What is the correct way to parallelize it? Use multiprocessing? I am more inclined to use the dask executor, I looked at several examples, but nowhere the flow was registered, but simply executed. What is the best solution to parallelize the calculations (the best option, I assume, that each node of the executor takes a separate file for itself)?
    k
    • 2
    • 4
  • a

    Adam

    08/18/2020, 9:45 AM
    Good morning friends. A quick question on debugging with logs shown on the Prefect Cloud UI. I have some error logs buried among my info logs, but I’m having a hard time displaying them while keeping the info logs visible. I can filter for error logs, but then I want to see the logs surrounding those (i.e. the info logs) but I can’t seem to find a clever way of doing that in the Prefect Cloud UI. Any thoughts? Downloading the logs would also be an option but it seems it only downloads logs for the rendered page.
    j
    • 2
    • 9
  • x

    x062Wyhdolq

    08/18/2020, 9:53 AM
    Hi everyone! Can I ask for help here?
  • j

    Jacob Blanco

    08/18/2020, 9:57 AM
    I'm going crazy, I'm trying to pull a secret in a class task, and the type it returns is
    <class 'prefect.configuration.Config'>
    and I have no idea why. Did I do something wrong in the
    config.toml
    file??
    j
    • 2
    • 3
  • x

    x062Wyhdolq

    08/18/2020, 9:58 AM
    I have created a python script. How can I run it in order to see the scheduling and flow status on the UI?
    j
    • 2
    • 3
  • j

    Jonas Hanfland

    08/18/2020, 2:03 PM
    Hey guys, I'm trying to implement some basic task dependencies, but some of my tasks appear multiple times in the graph that
    flow.visualize()
    generates. In the graph that's generated I would like
    Add new columns
    to be dependent on
    Extract customers
    , while
    Extract customers
    should itself dependent on
    Unnest dassport data
    . In the code, how do I add the
    Unnest passport data
    dependency to the existing
    .set_dependencies()
    block without causing duplicates? Thx in advance
    j
    • 2
    • 3
  • b

    bral

    08/18/2020, 4:10 PM
    Hi all! I have a problem with flow again :)
    a
    j
    • 3
    • 8
  • p

    Philip MacMenamin

    08/18/2020, 5:42 PM
    How do I spec params in gql while trying to run a job? eg if I have a params called input_loc and submission_id, how do I define those args in gql?
    mutation {
      create_flow_run(
        input: {
          flow_id: "dcbbfc94-5643-453c-941a-0efa8505df66"
          parameters: {"input_loc": "test"}
        }
      ) {
        id
      }
    }
    j
    • 2
    • 8
Powered by Linen
Title
p

Philip MacMenamin

08/18/2020, 5:42 PM
How do I spec params in gql while trying to run a job? eg if I have a params called input_loc and submission_id, how do I define those args in gql?
mutation {
  create_flow_run(
    input: {
      flow_id: "dcbbfc94-5643-453c-941a-0efa8505df66"
      parameters: {"input_loc": "test"}
    }
  ) {
    id
  }
}
as in, normally I'd just drop something like this JSON into the GUI
{
  "files": [
    "fname.txt",
    "fname2.txt"
  ],
  "input_loc": "/test_job_loc/",
  "submission_id": "test_job_id"
}
j

Jenny

08/18/2020, 6:17 PM
Hi @Philip MacMenamin - let me check that for you.
p

Philip MacMenamin

08/18/2020, 6:18 PM
thanks Jenny - I'm sure it's a dumb question, it's just I'm not getting the exact syntax. this is where I was looking https://docs.prefect.io/orchestration/concepts/flow_runs.html#scheduling-a-flow-run
👍 1
j

Jenny

08/18/2020, 6:49 PM
Hi @Philip MacMenamin - took a bit of json-figuring but here's an example:
mutation {
  create_flow_run(input: {
    flow_id: "xxxxxxxxx",
    parameters: "{\"a\":2}"
    }
  ) {
    id
  }
}
I'll make a note to add that to the docs - at least to help me remember for next time!
👍 1
p

Philip MacMenamin

08/18/2020, 6:50 PM
ok, so you're having to escape every " ?
and then wrap the whole thing in "'s - ok. I see. I'll test
yup, that's the one. Thanks!
👍 1
View count: 1