https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jason Motley

    04/06/2022, 11:12 PM
    What's the best approach to using a for loop that would normally be within the
    main
    section of a standard python script?
    k
    11 replies · 2 participants
  • s

    Shiyu Gan

    04/07/2022, 2:11 AM
    Is "dependent flows" a common used scenario in the real world in industry?
    k
    3 replies · 2 participants
  • a

    Alan Ning

    04/07/2022, 2:28 AM
    Does Prefect Enterprise support SAML SSO?
    k
    1 reply · 2 participants
  • m

    Moss Ebeling

    04/07/2022, 6:08 AM
    Hi there, I'm looking at some of the new stuff in Orion beta and I'm not sure I follow how storage works in Orion. I can use the helper to create new storage options, but I don't see how to specify which option is used beyond the "server default". Can I specify this in my deployment spec? Inside the flow runner?
    a
    2 replies · 2 participants
  • r

    Ricardo Gaspar

    04/07/2022, 12:11 PM
    I’m migrating my flow code from prefect 0.15 to orion
    2.0.b2
    . do you know where the
    resource_manager
    is located now? Before I was importing
    from prefect import resource_manager
    https://docs.prefect.io/core/idioms/resource-manager.html CC @Anna Geller @Kevin Kho
    a
    5 replies · 2 participants
  • r

    Robert Holmes

    04/07/2022, 1:38 PM
    Hello Everyone! New user of Prefect here 🙂 I am trying to understand if using map makes sense here. Currently, I am looping through a dataframe to get values and assign them as variables to pass to tasks. The code below works fine but wanted to know is mapping a viable solution?
    def prefect_flow():
        with Flow('cloud_reporting_etl') as flow:
            for column, value in jobs_df.iterrows():
                job_name = value['Job Name']
                query = value['Query']
                filename = value['Filename']
                extracted = extract_data(f"""{query}""")
                load_data_to_s3(extracted, filename)
        return flow
    k
    3 replies · 2 participants
  • b

    Bruno Nunes

    04/07/2022, 2:39 PM
    Hello, Is there a way (without Prefect Cloud) to setup security around prefect UI user capabilities to run/delete/read flows?
    k
    4 replies · 2 participants
  • r

    Ricardo Gaspar

    04/07/2022, 3:45 PM
    I’m running orion
    2.0b2
    and starting the server (on an EC2 machine) with an IP on a different port. (self-hosted). I’m able to connect to it via browser (I’ve set all the security groups and routes to allow me to connect to it on a browser). However it seems that internally Prefect Orion is still interacting with the default host and port. So the UI doesn’t render all the info (see screenshot in the thread) Is there something extra I need to change?
    k
    m
    +1
    7 replies · 4 participants
  • a

    Anders Segerberg

    04/07/2022, 5:12 PM
    What is the best practice for the following situation? I have tens of thousands of mapped tasks. Suppose that each writes to a remote database. If that database goes down, I will see cascading failures. What I would like to do is a) manage the threshold of allowed failures across the aggregate mapped tasks b) when that threshold is reached, halt the workflow c) Be able to resume the workflow from the point of failure c) would ostensibly require caching behavior, I would think. But since there are no dependencies between the mapped tasks, I don't want to just cache the results of the successful tasks -- I just want to somehow remember the mapped tasks that already got ran to successful completion, and then, when the flow is restarted / resumed, only map across those tasks that did not run to completion / were not started.
    :discourse: 1
    k
    a
    13 replies · 3 participants
  • h

    Horatiu Bota

    04/07/2022, 5:46 PM
    hi community! is there a good way to invalidate a checkpoint? tried setting
    cache_validator=prefect.engine.cache_validator.never_use
    on the task but that doesn't seem to do the trick (running prefect core locally)
    k
    7 replies · 2 participants
  • k

    Kevin Schaper

    04/07/2022, 6:46 PM
    Does Orion support dynamic task naming yet? I tried the
    @task(name="{val}")
    syntax, but it didn’t look like the templating syntax was getting interpreted. (I would up with “{val}” in all of my task names)
    a
    1 reply · 2 participants
  • j

    John Ramey

    04/07/2022, 6:51 PM
    Is it okay to return a fitted
    scikit-learn
    object from a task? Any gotchas if I then pass it to a second task which is responsible for calling
    .predict()
    using that same object. Reason I’m asking: I thought I read somewhere in the prefect docs to be careful with state in such cases, but I can’t seem to find it.
    a
    k
    3 replies · 3 participants
  • w

    Wei Mei

    04/07/2022, 8:20 PM
    Hi! I have a flow using github storage that is scheduled to run (using prefect cloud). If I push new code and register it with, are the currently already scheduled runs going to use the new code when triggered?
    prefect register --project $PREFECT_PROJECT_NAME --path flows/ --label prod --no-schedule
    k
    4 replies · 2 participants
  • n

    Naga Sravika Bodapati

    04/08/2022, 6:57 AM
    1. Hi all, we have been using Prefect for scheduling and monitoring flows as part of our organization work. We have a set of questions and issues we need answers for. Please address them if possible. We have a Docker Agent managing flows built with Docker Run as run config and Local Storage but the agent seems to stop picking and running flows after a few days - typically 6-7 days and even if it is actively polling, the flows are not run. Is this issue seem before and if so ,what is the solution? We start the docker agent with nohup prefect agent docker start --token <auth_token> -l <agent_label> 2. We have a flow which tries to establish a pyodbc connection to ms sql server with connection timeout of 60 seconds and have the task configured with max_reties and retry delay. max_retries=2, retry_delay=timedelta(minutes=1) We expect the task to finish running in under 10mins but they are running off into over 15-16hours. Why is this the case? This is also causing cannot allocate memory issue. 3. We have seen some scenarios like a prefect agent is down suddenly and this might lead to flows failing continuously. How can cases like this be avoided in future? For example : There are some scenarios where an agent is going down, at this time all the scheduled flows pile up and are tagged under Late runs. After few hours when the agent is up again, all the late run flows starts to run and this leads to a memory issue in the machine. 4. Is there a way in prefect, to kill a flow if its is running for more than 'x' amount of time. 5. Is there way in prefect to stop a pipeline schedule based on condition (eg: if the last three consecutive flow fails then stop the schedule).
    a
    3 replies · 2 participants
  • m

    Moss Ebeling

    04/08/2022, 7:23 AM
    Hey all, two quick questions 1. Is there a channel specifically for Orion questions? 2. Is there a way to specify a Dockerfile to use rather than an image for DockerFlowRunner? Context is basically I make heavy usage of things like
    from mypackage.utils import frequently_used_task
    where my flow is also inside of
    mypackage
    . Running outside of a docker container with
    mypackage
    installed means I hit an import error immediately.
    a
    4 replies · 2 participants
  • k

    kevin

    04/08/2022, 7:36 AM
    Hey guys, I’m making a database query as a snowflake task inside a Prefect Cloud flow. The output of this query takes up memory size that’s larger than 1 MB so when the Task tries to call
    set_task_result()
    it fails with error message:
    'State payload is too large.'
    which makes sense. My Prefect infrastructure is sitting on a KubernetesJobEnvironment. My understanding is that to store this large result correctly I should follow the documentation here: https://docs.prefect.io/core/concepts/results.html#result-objects Are there any additional considerations I should take into account with setting up correct result storage with this infrastructure?
    a
    8 replies · 2 participants
  • a

    Alex Rogozhnikov

    04/08/2022, 8:09 AM
    Hello, a question about cache. Is there a scheduled task or function that deletes expired cache?
    a
    k
    9 replies · 3 participants
  • o

    Olivér Atanaszov

    04/08/2022, 12:23 PM
    Hi, could someone please elaborate what "Prefect init job" actually refers to here https://docs.prefect.io/api/latest/cli/agent.html#kubernetes-install? Are those cpu/mem resource options used for controlling the agent or the jobs?
    a
    m
    5 replies · 3 participants
  • s

    Stephen Lloyd

    04/08/2022, 12:43 PM
    I wrote a naive flow that looks like:
    @task
    def get_table() -> object:
      ... query table
      result: pandas.DataFrame = cursor.fetch_dataframe()
      return result
    
    def load_to_s3(result: object) -> None:
      awswrangler.s3.to_csv(
        df=result,
        path='<s3://bucket/folder/table.csv>
      )
    
    with Flow(...) as flow:
      get_data = get_table()
      save_data = load_to_s3(get_data)
      ...
    I’d like to now extend this to somehow pass a list of tables from the same database to extract and load to s3. Is this possible given this simple template?
    s
    2 replies · 2 participants
  • d

    Daniel

    04/08/2022, 2:14 PM
    I'm new to Prefect and I'm having difficulty using/understanding arguments with the
    DbtShellTask
    . Here's some mock code:
    from prefect.tasks.dbt.dbt import DbtShellTask
    
    dbt_shell_task = DbtShellTask() 
    
    with Flow("nightly_dbt_flow") as flow:
        env = Parameter('env', default='dev')
        run_task = dbt_shell_task(command='dbt run',
                                  environment='dev')
    
    flow.run()
    I receive a TypeError that the keyword argument
    environment
    doesn't exist:
    Traceback (most recent call last):
      File "nightly_dbt_flow.py", line 50, in <module>
        environment='dev')
      File "/Users/daniel/git-repos/bi-prefect/.python/lib/python3.7/site-packages/prefect/core/task.py", line 662, in __call__
        *args, mapped=mapped, upstream_tasks=upstream_tasks, flow=flow, **kwargs
      File "/Users/daniel/git-repos/bi-prefect/.python/lib/python3.7/site-packages/prefect/core/task.py", line 702, in bind
        callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
      File "/Users/daniel/.pyenv/versions/3.7.10/lib/python3.7/inspect.py", line 3015, in bind
        return args[0]._bind(args[1:], kwargs)
      File "/Users/daniel/.pyenv/versions/3.7.10/lib/python3.7/inspect.py", line 3006, in _bind
        arg=next(iter(kwargs))))
    TypeError: got an unexpected keyword argument 'environment'
    Why would
    environment
    cause an error but not
    command
    ? When I remove
    environment
    the task does get attempted without a
    TypeError
    .
    environment
    and
    command
    are both arguments defined in the
    DbtShellTask
    class. I'm on prefect version
    1.2.0
    .
    a
    k
    +1
    10 replies · 4 participants
  • k

    Kevin Weiler

    04/08/2022, 4:08 PM
    We’re getting into operations using Prefect and wondering if there is a way to run a re-run a failed (or perhaps not failed) subtree of the graph while the rest of the DAG is still running. We have a DAG that might run for longer than 24 hours and we’ve encountered a common situation where one node in the graph fails for exogenous reasons. We’d like to fix the exogenous problem and then re-run that node so the dependent nodes can move on, but not interrupt the already running nodes.
    k
    8 replies · 2 participants
  • a

    Adi Gandra

    04/08/2022, 5:18 PM
    Hey, I setup Prefect on fargate EKS on AWS. Everything has been running fine but now the completed pods are not deleted and are skyrocketing the AWS costs. I see a bunch of pods when i run kubectl get pods, (i deleted them manually for now). Any guidance on things I can look at to make sure that something is still not running and AWS is not going to keep racking up the bill?
    k
    m
    3 replies · 3 participants
  • g

    Gustavo Puma

    04/08/2022, 5:50 PM
    Hi 👋 I started playing around with Prefect today as an orchestration tool for our Databricks jobs. I've managed to trigger jobs successfully which is great but, I cannot figure out how to run tasks in sequence using the Functional API. I've been browsing through the docs but did not this example with Task Classes. Here's my flow:
    with Flow("application-etl") as flow:
        conn = PrefectSecret("DATABRICKS_CONNECTION_STRING_PRE")
    
        bronze = DatabricksRunNow(job_id=BRONZE_APPLICATION_JOB_ID, name="bronze")
    
        silver = DatabricksRunNow(job_id=SILVER_APPLICATION_JOB_ID, name="silver")
        silver.set_upstream(bronze)
    
        gold = DatabricksRunNow(job_id=GOLD_APPLICATION_JOB_ID, name="gold")
        gold.set_upstream(silver)
    
        bronze(databricks_conn_secret=conn)
        silver(databricks_conn_secret=conn)
        gold(databricks_conn_secret=conn)
    
    flow.run()
    I want to run my tasks in the sequence
    bronze > silver > gold
    . While this executes gold or silver are being started before bronze. I don't know if I'm misunderstanding how
    set_upstream
    works 🤔 Thanks in advance
    k
    2 replies · 2 participants
  • p

    Patrick Tan

    04/08/2022, 6:48 PM
    Prefect logging question: When I ran below from Mac, both info and debug rows (2) were displayed, but when I registered and ran flow from Cloud, only 1 row, the logger.info row is shown, please advise logger=prefect.context.get("logger") logger.setLevel(logging.DEBUG) logger.info(‘this is test info ) logger.debug('this is test debug')
    k
    16 replies · 2 participants
  • w

    wiretrack

    04/08/2022, 9:53 PM
    Hi all. Wondering if anybody have experienced running Prefect inside an API context using a third party worker, like Celery or something. What I mean is, declaring the flow/tasks with Prefect Core, and then calling
    flow.run()
    within a Celery task for example. Anyone?
    k
    6 replies · 2 participants
  • b

    Ben Epstein

    04/09/2022, 12:52 PM
    Is there a way to create flows in prefect 2.0 from a class function?
    from prefect import flow
    
    class MyClass:
        def __init__(self, ct: int=0):
            self.count = ct
            
        @flow
        def runit(self):
            print(f"I have been called {self.count} times")
            self.count += 1
            
            
    c = MyClass()
    c.runit()
    I get the error
    TypeError: missing a required argument: 'self'
    Is that expected, or a potential bug in orion?
    k
    a
    +1
    7 replies · 4 participants
  • b

    Ben Epstein

    04/09/2022, 12:53 PM
    I could of course to
    c.runit(c)
    but that seems pretty wrong
  • c

    Carlos Soza

    04/09/2022, 2:57 PM
    Hello Team Prefect, I have some question about ECSAgent, because when i tried to run my flow i’ve got botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Container.image should not be null or empty. After having debugged, I’ve detected that the problem was the containerDefinitions used by the agent has two entries, one with my ECSRun configuration and other with only {“name”:“flow”}. But my configuration only had one. My question is, this is a bug or is mandatory using ‘flow’ in the containerDefinitions name parameter?. (I have using prefect 1.2.0, DockerStorage, ECSRun and ECSAgent. And my configuration for the task_definition is directly in the code.)
    k
    5 replies · 2 participants
  • k

    Ken Nguyen

    04/09/2022, 4:56 PM
    Hi! I’m currently orchestrating dbt via Prefect. The usual dbt flow uses pygit2 to pull from our main branch, then run the dbt models and create tables into our production schema in Snowflake. I want to create a tool where we can run 2 dbt instances in parallel, one that pulls from the main branch, one that pulls from a dev branch (chosen by a user via parameters). Each of those would create tables in a separate schema in Snowflake. The purpose is to create 2 versions of certain tables to compare and see the effect of the dev branch. Now, my question is what would be the best way to design the flow(s) to this? I’m not sure if I should create 1 flow and some how pull 2 separate branches and have dbt run those 2 in parallel. Or create 1 parent flow that passes the user inputed parameter into the children flow, then each child flow runs a dbt instance? Or something completely different? Sorry for the lengthy question!
    a
    k
    30 replies · 3 participants
  • m

    Michael Law

    04/11/2022, 8:23 AM
    Hey! I have been using prefect for a bit now on AKS in Azure for hosting our agents. On the whole this seems to work well, it creates K8s jobs for our flow runs, but regardless of how we trigger them, it only ever seems to to run the amount of concurrent tasks in the flow to be the amount of virtual or physical CPUs on the cluster. I was under the impression if we set the scheduler to "threads" and the num_workers to x2-4 the number of CPUs, the flow tasks would look to create that amount of threads to run the flow? I am comfortable we can scale these as we are simply submitting jobs to a databricks cluster, so we have no CPU bound work at all happening on the cluster, it is purely IO monitoring. Any help would be appreciated, I have attached a sample of how we trigger our flow in the thread. Thanks in advance for any help which could be offered.
    a
    13 replies · 2 participants
Powered by Linen
Title
m

Michael Law

04/11/2022, 8:23 AM
Hey! I have been using prefect for a bit now on AKS in Azure for hosting our agents. On the whole this seems to work well, it creates K8s jobs for our flow runs, but regardless of how we trigger them, it only ever seems to to run the amount of concurrent tasks in the flow to be the amount of virtual or physical CPUs on the cluster. I was under the impression if we set the scheduler to "threads" and the num_workers to x2-4 the number of CPUs, the flow tasks would look to create that amount of threads to run the flow? I am comfortable we can scale these as we are simply submitting jobs to a databricks cluster, so we have no CPU bound work at all happening on the cluster, it is purely IO monitoring. Any help would be appreciated, I have attached a sample of how we trigger our flow in the thread. Thanks in advance for any help which could be offered.
def run(self, flow: Flow, scheduler = "threads", num_workers = 8):
        env = {
            "DATABRICKS_CLUSTER_ID": self.cluster_id,
            "MOUNT": self.mount,
            "CODEMOUNT": self.codeMount,
            "ENVIRONMENT": self.environment,
            "RELEASE_VERSION": self.release_version,
            "APP_IMAGE": self.kubernetes_job_image,
            "AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
            "PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'",
            "PREFECT_PROJECT": self.prefect_project,
            "DATABRICKS_INSTANCE_POOL_ID": self.instance_pool_id,
            "SQL_SERVER_JDBC_CONNECTION_STRING": self.sql_jdbc_connection_string
        }

        flow.executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)

        # 'DEBUG' is not set on the K8 cluster

        if (os.environ["DEBUG"] == "1"):
            flow.run()
        elif (os.environ["DEBUG"] == "2"):
            flow.visualize()
        else:
            flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=env)
            flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)
a

Anna Geller

04/11/2022, 9:22 AM
How did you assess how many threads and processes your flow run was using? One option that may help here is attaching the executor directly to your Flow object when it gets created:
with Flow("your_flow", executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)) as flow:
This may help since the executor is retrieved from storage at runtime
m

Michael Law

04/11/2022, 10:16 AM
I can see from the timeline it never runs more than 4, which matches the amount of processors on the machine
Thanks for getting back to me
I'll try that suggestion too
a

Anna Geller

04/11/2022, 10:30 AM
Gotcha. What may also help is to log the thread and process ID within your flow. You could do that by attaching the thread and process to your log format:
export PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"
this could be set on the agent or attached to your run config afaik:
flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=dict(PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"))
m

Michael Law

04/11/2022, 10:32 AM
Ah yeah great shout @Anna Geller I'll add that now and trigger some of these jobs
👍 1
Success!!
TYVM
a

Anna Geller

04/11/2022, 2:09 PM
Glad to hear it works now! 🙌
View count: 6