https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Marwan Sarieddine

    04/06/2021, 2:10 PM
    Hi folks, we are getting failures for all of our flow runs after updating to prefect
    v0.14.15
    basically the failure happens after the flow has completed running at the state handler level - please see the traceback in the thread
    k
    j
    d
    • 4
    • 10
  • f

    Florian Kühnlenz

    04/06/2021, 3:00 PM
    Hi. I am having some trouble making
    prefect register --module
    work. My project looks like this:
    flows
      + __init__.py
      + my_flow.py
      + shared_tasks
        + __init__.py
        + util.py
    When I run
    prefect register --project 'Prefect Testing' -m '<http://flows.my|flows.my>_flow'
    , I get
    No module named 'flows'
    . What am I missing?
    k
    • 2
    • 19
  • a

    Andor Tóth

    04/06/2021, 4:01 PM
    Hello. I'm still test driving Prefect (v0.14.15), but my flow stucks and I get zombie processes. Any ideas? Here's the code without imports:
    SQL_DIR = Path('sql')
    
    @task
    def list_query_names():
        return [f.name for f in SQL_DIR.glob('*.sql')]
    
    @task(log_stdout=True, timeout=15, task_run_name='{name}-{date:%F_%T}', checkpoint=False)
    def exec_query(name: str):
        sql = Path(SQL_DIR / name).read_text()
        print('Query name: %s' % name)
        engine = sqla.create_engine(DSN)
        rs = engine.execute(sql)
        return dict(keys=rs.keys(), rows=rs.fetchall())
    
    @task
    def save_results(rs, name):
        with (OUT_DIR / name).with_suffix('.txt').open('w') as f:
            csv_writer = csv.writer(f, delimiter="\t")
            csv_writer.writerow(rs['keys'])
            csv_writer.writerows(rs['rows'])
    
    with Flow("Queries") as flow:
        query_names = list_query_names()
        results = exec_query.map(query_names)
        save_results.map(results, query_names)
        
    flow.executor = LocalDaskExecutor(num_workers=2, schedule='processes')
    flow.run()
    d
    m
    m
    • 4
    • 100
  • r

    Robin

    04/06/2021, 4:13 PM
    To all the adventurous apple M1 users out there (and in general everyone working with different OS):
    How do you build your prefect flows to ensure that the docker images run on different/the desired architectures/OS?
    z
    k
    +2
    • 5
    • 8
  • t

    Tomás Emilio Silva Ebensperger

    04/06/2021, 4:25 PM
    Is there any way to have one LocalAgent running multiple flows at the same time?
    k
    d
    • 3
    • 15
  • v

    Vincent

    04/06/2021, 4:43 PM
    Hi - I am having an issue where a user has issues viewing/ joining the group via the web interface. Any advice is welcome.
    k
    n
    • 3
    • 6
  • j

    Jacob Hayes

    04/06/2021, 7:09 PM
    How can mapped tasks be checkpointed/cached? A default Flow level result with
    location="{flow_run_id}/{task_run_id}.prefect_result"
    works for the unmapped tasks, but I don't see saved results for the mapped tasks (even though each one has a unique
    task_run_id
    ).
    k
    • 2
    • 7
  • z

    Zack Novak

    04/06/2021, 7:15 PM
    Hey all! Looks like Vincent might be having the same issue as me, but I am unable to add users with the correct role via the web interface to our tenants. Is there anyone from Prefect who could help me out with this? I see the error: Looking for role-based access controls? This feature is only available on Enterprise plans; check out our pricing page for more details. I can confirm we are on an enterprise plan. We did recently create 5 new tenants under our business license, but this issue is affecting the previous tenant with RBAC.
    👀 1
    d
    • 2
    • 1
  • k

    Kieran

    04/06/2021, 11:44 PM
    Hey, Does anyone have any good tips for hunting down why a Flow does not serialise?
    k
    • 2
    • 5
  • j

    Jonathan Chu

    04/07/2021, 12:06 AM
    how do usually manage staging and production versions of a flow? i.e. for a single flow, deploy the latest version to staging, run it on staging, if it looks good, then push that version to production is there anything better than just copypasta the flow definition code, just with a different flow name, like a
    data-ingestion-flow-staging
    and
    data-ingestion-flow-production
    ?
    r
    k
    • 3
    • 5
  • r

    Ranu Goldan

    04/07/2021, 1:48 AM
    Hi everyone, we know we can set default parameter to schedule. But is it possible to set default FlowRun name with some prefix for a schedule? Example: in schedule A, the flow run will be named as schedule_a_run_2020-01-01, and schedule B will be schedule_b_run_2020-01-01 Thanks in advance!
    m
    • 2
    • 3
  • t

    tash lai

    04/07/2021, 4:33 AM
    Hey! There's a problem. Just as an example, say there's a website that has a list of tv shows, a a page for each tv show has links to information about every episode. I want to scrape all this info and save everything into a table (url, show_name, episode_name, episode_description)
    t
    e
    k
    • 4
    • 5
  • t

    tarikki

    04/07/2021, 5:26 AM
    Hi! Sorry if this is the wrong place to ask this question 🙇 Does someone know if there are any plans to support Google Cloud Run as an agent? It's the Google equivalent of Fargate. I mainly use Google infrastructure and I'm thinking of automating some tasks through Prefect, so trying to figure out what options there are 😊 Thanks!
    k
    • 2
    • 2
  • t

    Tomás Emilio Silva Ebensperger

    04/07/2021, 2:07 PM
    Following up a question that was successfully answered yesterday. When you run an agent (in my case local agent) + Cloud, what flows does it listen to? Any flows within the account (deduced from the token give)?
    k
    t
    • 3
    • 4
  • i

    Igor Bondartsov

    04/07/2021, 3:55 PM
    Hello. I have a little question: I found an example: https://github.com/PrefectHQ/prefect/blob/master/examples/old/task_library/mysql/mysql_flow.py how I can make a host as a parameter?
    k
    d
    • 3
    • 9
  • h

    Hawkar Mahmod

    04/07/2021, 4:55 PM
    What is Prefect-ish way to end a flow run. Suppose one of the early tasks in a flow does not have the required data to continue the rest of the flow, how does one gracefully end a flow?
    k
    a
    • 3
    • 9
  • s

    Satheesh K

    04/07/2021, 5:15 PM
    Hello, Is there a way to export Prefect flow definition to some config format like YAML, and then re-build the flow based on the same imported config?
    m
    d
    • 3
    • 19
  • x

    xyzy

    04/07/2021, 6:08 PM
    Is there a reason that flow.result and flow.executor are not serialized for flow.register, unlike flow.run_config and flow.storage?
    k
    x
    • 3
    • 7
  • m

    Matthew Millendorf

    04/07/2021, 6:12 PM
    Hello, I am looking to use Prefect to help scale some large batch processes with ECS (Fargate) and Dask and am wondering if anyone has any insight on doing this or could point me to some resources.  Additionally, I am having some difficulty figuring out why my submitted Flow’s ECS Task immediately becomes INVALID when provisioning resources.
    k
    e
    • 3
    • 48
  • x

    xyzy

    04/07/2021, 8:06 PM
    Did anyone try to build a multi stage Docker image for use with prefect? I'm using python:3.8-buster as the base for the builder and python:3.8-slim-buster for the main file, but I get errors like this when trying to do a KubernetesRun:
    Pod prefect-job-6ab817cf-d6446 failed.
    	Container 'flow' state: terminated
    		Exit Code:: 1
    		Reason: Error
    DockerRun doesn't even continue after pulling the image.
    k
    • 2
    • 6
  • j

    Jay Sundaram

    04/07/2021, 8:39 PM
    Anyone know what it means when you attempt to register a flow with
    prefect register
    and you don't get any error/warning messages but only:
    ================= 0 registered, 1 skipped =================
    k
    m
    • 3
    • 29
  • j

    Jay Sundaram

    04/08/2021, 2:07 AM
    Is there a clear documented example where a Prefect Flow is defined in the entrypoint script in a Docker image and that Flow can be invoked/executed from the Prefect UI? All of this running locally.
    k
    • 2
    • 9
  • r

    Ranu Goldan

    04/08/2021, 2:18 AM
    Hi everyone. I'm trying to implement dynamic secrets usage using
    PrefectSecret
    Task I want to pass the secret key via
    Parameter
    . But it says that PrefectSecret argument should be statically defined so impossible to use parameter as secret key. Is that any workaround to solve that? Thanks in advance!
    j
    j
    k
    • 4
    • 12
  • i

    Igor Bondartsov

    04/08/2021, 5:03 AM
    Hi! How I can add the tasks in runtime? For example: I am waiting for Parameter where should be count how many same tasks I want to create.
    j
    z
    • 3
    • 2
  • g

    g.suijker

    04/08/2021, 11:23 AM
    Hi all, we are currently facing a problem where a flow run in Prefect Cloud takes significantly more time (factor 20/25) than the same flow run in Prefect Server. The flow retrieves data from a mysql database and inserts the data into a sql server database. When running the flow in Prefect Server, with Docker storage and a Docker agent I get around 1000 rows/sec insertions into the sql server db. While in Prefect Cloud, with Docker storage and a Kubernetes agent I get around 40 rows/sec insertions. Any ideas on the cause of this performance issue?
    m
    k
    +2
    • 5
    • 36
  • d

    Domantas

    04/08/2021, 1:37 PM
    Hello guys! Sorry for a stupid question, but maybe someone knows if it is possible to pass
    host
    Parameter as
    MySQLExecute
    argument? Pseudo code:
    from prefect.tasks.mysql.mysql import MySQLExecute
    from prefect import Flow, Parameter
    
    example_query = MySQLExecute(
        name="example_query",
        db_name="db name",
        user="some user",
        password="123456",
        port=1234,
        query="select * from example_table;"
    )
    
    
    with Flow("example") as f:
       db_host = Parameter("db_host", default="host_address")
       
       mysql_execute = example_query(host=db_host)
    when I execute this pseaudo code, I get this error:
    TypeError: __init__() missing 1 required positional argument: 'host'
    According to documentation: https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute ,
    host
    argument is not optional, so by default it is required to be defined in a
    MySQLExcute
    function. Maybe there is a proper way to pass
    host
    value to the
    MySQLExecute
    function?
    k
    • 2
    • 3
  • j

    Jay Sundaram

    04/08/2021, 2:17 PM
    Any tips on how to further investigate root cause for:
    Unexpected error: ValueError('Could not infer an active Flow context.')
    m
    z
    • 3
    • 26
  • j

    Jay Sundaram

    04/08/2021, 3:08 PM
    What does it mean when on invoking a system call from within a task, a <Parameter> is used(?) instead of the expected string?
    --destination_bucket_name <Parameter: destination_bucket_name>
    when was expecting:
    --destination_bucket_name myorg-S3-bucket
    d
    k
    • 3
    • 52
  • r

    Remi Paulin

    04/08/2021, 4:30 PM
    Hey, we've recently started exploring Prefect and we'd like to schedule Spark jobs on Cloud Dataproc - I've noticed the excellent integration with Databricks but didn't see anything re. running jobs on other Spark platforms/distributions. Am I missing something? Has anyone implemented this already? Thanks a lot for your help!
    k
    • 2
    • 3
  • r

    Remi Paulin

    04/08/2021, 5:24 PM
    Hey, another quick question: we would like to export data lineage metadata from Prefect to our Data Catalog - is there an API from which we can pull such metadata?
    k
    • 2
    • 8
Powered by Linen
Title
r

Remi Paulin

04/08/2021, 5:24 PM
Hey, another quick question: we would like to export data lineage metadata from Prefect to our Data Catalog - is there an API from which we can pull such metadata?
k

Kevin Kho

04/08/2021, 5:32 PM
Hey @Remi Paulin, do you mean that when a flow runs, you want to keep track of the metadata (what tasks happened?) and store it in your Data Catalog? What format would you need it in for the data catalog?
r

Remi Paulin

04/08/2021, 5:44 PM
yes! any format should be fine actually. We're thinking of using Data Galaxy as our Data Catalog and the metadata would eventually need to be parsed to be ingested into Data Galaxy anyway.
k

Kevin Kho

04/08/2021, 5:47 PM
Ah ok so we have a GraphQL API where you can query your flows and task runs. Assuming they are named descriptively, you can pull out the data, parse it, and feed it into the data catalog.
I think what this would look like is you have a separate Python script to query your Flow data and parse them to upload to Data Galaxy (maybe even as another Prefect flow).
👍 1
Out of curiosity, do you know how detailed you want your lineage to be? Does it need to specify that “these 3 data sources were joined and filtered to produce this dataset”? Or is it more of carrying over schema and description from original columns?
r

Remi Paulin

04/08/2021, 5:55 PM
Ok amazing I'll check this out. Ideally we'd like to get quite detailed lineage such as in the example you mentioned (joins & filters involved). But from what I understood since Prefect Cloud doesn't have much visibility into the actual logic maybe this wouldn't be easy to implement. Using Prefect with dbt for instance would maybe enable us to retrieve such detailed metadata (only for the flows governed by dbt of course).
k

Kevin Kho

04/08/2021, 6:03 PM
I think it can be implemented if your Tasks are well named. It would also help if there is a relatively clean “separation of concerns” with your data engineering. You are right though that we actually don’t see the data (with our Hybrid Model).
r

Remi Paulin

04/08/2021, 6:07 PM
ok got it - I definitely need to think about this more. Thanks again for your help!!
👍 1
View count: 1