https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • l

    Luis Jaramillo

    11/28/2021, 3:31 PM
    đź‘‹
    đź‘‹ 6
    z
    a
    j
    • 4
    • 3
  • s

    Sridhar

    11/29/2021, 12:38 AM
    Hi, I have a function that runs parallelly to fetch data from external api. The
    get_data_asynchronous()
    function below creates 10 threads and calls the api concurrently. I am using this function in
    run_factset_api()
    . As a standalone code locally this works fine. But when I schedule a run on prefect, the
    run_factset_api()
    function exits before execution and returns coroutine object (although locally it returns the desired value). Is there something I should do to facilitate parallel run on prefect?
    async def get_data_asynchronous():
        with ThreadPoolExecutor(max_workers=10) as executor:
            with requests.Session() as session:
                # Set any session parameters here before calling `fetch`
                loop = asyncio.get_event_loop()
                tasks = [
                    loop.run_in_executor(
                        executor,
                        company.get_company_records,
                        *(session, [companies], {**company_info, **formulas})
                        # Allows us to pass in multiple arguments to `fetch`
                    )
                    for companies in companies_to_fetch
                ]
                for response in await asyncio.gather(*tasks):
                    master = master.append(response, ignore_index=True)
        return master
    
    @task
    def run_factset_api():
        loop = asyncio.get_event_loop()
        future = asyncio.ensure_future(get_data_asynchronous())
        master = loop.run_until_complete(future)
        return master
    
    @task
    def save_data_to_s3(emmi_reduction):
        s3_resource = boto3.resource('s3')
        s3_resource.Object(bucket, 'factset_output_data.csv').put(Body=csv_buffer.getvalue())
    
    with Flow('api-flow', storage=STORAGE, run_config=RUN_CONFIG) as flow:
         response = run_factset_api()        
         if response:
             save_data_to_db(response)
            
    flow.register('pipeline')
    a
    • 2
    • 2
  • p

    Priyab Dash

    11/29/2021, 9:08 AM
    We have a function defined as a task as below
    @task(log_stdout=True, state_handlers=[notify_run_failure])
    def submit_job_run_to_tmc(job_run):
    but this is being called twice when we run a flow
    a
    k
    • 3
    • 22
  • g

    Gabriel Milan

    11/29/2021, 11:45 AM
    Hi all, I was wondering if there's a way of setting environments for my jobs on the Kubernetes Agent. I'm trying that by using the Helm chart, and my
    agent
    section of the
    values.yaml
    file looks like this:
    agent:
      enabled: true
      prefectLabels:
        - mylabel
      ...
      job:
        ...
        envFrom:
          - secretRef:
              name: gcp-credentials
    The secret
    gcp-credentials
    exists and is correct. Unfortunately, this doesn't seem to work
    a
    • 2
    • 14
  • z

    Zohaa Qamar

    11/29/2021, 2:07 PM
    Hi all, I have a Python task in my flow that has sys.exit() in it meaning I want that task to break if some condition has met and do not proceed further. But, my task keeps on running in this case and does nothing. Any help?
    s
    a
    • 3
    • 13
  • b

    Brian Phillips

    11/29/2021, 2:59 PM
    Is there a graphql mutation to change the location of a flow?
    k
    a
    • 3
    • 10
  • t

    Tim Enders

    11/29/2021, 3:39 PM
    Is there a way to see the release notes on the alpha releases of Orion?
    a
    • 2
    • 2
  • g

    Guillaume Latour

    11/29/2021, 5:11 PM
    Hi everyone, I hope you're having a good day! I just saw that my prefect server is desync with my prefect agent due to a timezone mismatch (server prefect runs as docker, which have their /etc/localtime point to /usr/share/utc/utc and my agent runs with
    prefect agent local start
    so it uses my local timezone) Is there any configuration option that I am missing that could provide a custom timezone to all the dockers launched via the
    prefect server start
    ?
    k
    • 2
    • 6
  • d

    dammy arinde

    11/29/2021, 5:33 PM
    Hi everyone, please is there a way to pass parameter from one flow to another? I'm using
    client.create_flow_run()
    to run the dependent flows but when I add parameter to it, it's not passing to the dependent flow.
    k
    • 2
    • 49
  • i

    Isaac Brodsky

    11/29/2021, 7:23 PM
    When upgrading from older versions of Prefect and using KubernetesRun run_config, is it relevant what the version of the agent starting the run is? I see
    AttributeError: 'Context' object has no attribute 'image'
    when DaskExecutor tries to start up a cluster
    k
    • 2
    • 9
  • d

    Derek Heyman

    11/29/2021, 8:18 PM
    I'm trying to add a team member to my prefect cloud account, but the invite button is grayed out. From what I understand, the free tier allows 3 users. Currently I only have myself and one other member under the users list.
    k
    • 2
    • 2
  • p

    Philip MacMenamin

    11/29/2021, 9:36 PM
    Just wondering if there's been anyone using Singularity to run containers rather than Docker?
    k
    • 2
    • 1
  • j

    joshua mclellan

    11/29/2021, 10:30 PM
    so from the prefect cli i can get the logs for a run, but these logs seem to come in batches - is there any way to follow the logs in the same way you might tail -f a log file?
    k
    • 2
    • 1
  • s

    Sandip Viradiya

    11/30/2021, 12:10 AM
    We are getting blank white pages on cloud.prefect.io I think it is an issue of CSS. Options are there but it is of white color.. hahaha
    n
    • 2
    • 3
  • d

    Dekel R

    11/30/2021, 8:11 AM
    Hey everyone, I’m trying to deploy one of my flows over Vertex AI (GCP) and Im getting the following error:
    File "/usr/local/lib/python3.9/site-packages/gcsfs/credentials.py", line 84, in _connect_google_default
        raise ValueError(msg.format(self.project, project))
    ValueError: User-provided project 'my_project' does not match the google default project 'some_generated_id'. Either
    Some code snippets and information about my flow -
    from prefect import Flow
    from prefect.storage import Docker
    from tasks.extract_product_data import extract_data
    from prefect.run_configs import VertexRun
    from prefect.schedules import IntervalSchedule
    from datetime import timedelta
    
    schedule = IntervalSchedule(interval=timedelta(days=1))
    
    with Flow("extract_comparable_products_data",
              storage=Docker(registry_url="us-central1-docker.pkg.dev/xxxx/",
                             dockerfile="./Dockerfile"), schedule=schedule) as flow:
        extract_data()
    
    flow.run_config = VertexRun(machine_type='e2-standard-16', labels=["ml"],
                                service_account='<http://prefect_service_account.iam.gserviceaccount.com|prefect_service_account.iam.gserviceaccount.com>')
    The flow has only one task for now for testing purposes. My task is using data from multiple projects of my organization (google cloud projects) so In every google_client interaction I use a specific project as a parameter, for example -
    storage_client = storage.Client(project='my_pro_1')
    The service account that Prefect use has permissions to all of the relevant projects (In general, storage, Bigquery, Artifactory, Vertex AI) . Anyone familiar with this issue? Thanks.
    a
    • 2
    • 14
  • e

    Emma Rizzi

    11/30/2021, 10:35 AM
    Hello, I get a strange error when installing Prefect on a VM (see picture bellow) What can I try ? Environment : Ubuntu 18.04, default python 2.7 I changed to 3.6.9, tried to install with pip and pip3
    a
    k
    • 3
    • 7
  • g

    Guilherme Petris

    11/30/2021, 10:38 AM
    Hey! Noob question here - can’t seem to find where is the .prefect folder to add the slack token in the config.toml as described here: https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#installation-instructions
    s
    a
    • 3
    • 27
  • t

    Tom Klein

    11/30/2021, 1:04 PM
    Hello, can anyone point me at any kind of comparison between
    Prefect-core
    and
    prefect-orion
    ? And maybe also some clarity about which of them is run when we go in the cloud path?
    a
    • 2
    • 7
  • z

    Zheng Xie

    11/30/2021, 1:17 PM
    I can get Prefect agent running locally and triggered by Prefect Cloud UI. Now I want to move the agent to a Kubernetes cluster on IBM Cloud. There is not a tutorial to list the steps and the docs from Prefect is grouped around some concepts that I don’t know what I need to accomplish the goal. Can you please list the steps? for example: 1. set flow storage to docker 2. Push the code to container registry in the kubernetes cluster 3. … I am hoping with this list, I can look up the Prefect docs to figure out what to do.
    a
    • 2
    • 9
  • s

    Samuel Hinton

    11/30/2021, 2:45 PM
    Hi team! Im having a few stability issues with prefect again. Ive noticed some hanging tasks, so I used that big “Cancel All” button in the prefect dashboard to try and clean things out. Except the tasks are now stuck, theyre just stuck in a different state. What would be the proper way to say more forcefully “Kill all tasks and get yourself back in working order?”
    k
    • 2
    • 6
  • m

    Martim Lobao

    11/30/2021, 5:53 PM
    one of the recurring issues i have with prefect is that things often seem to fail for no reason, and frequently with absolutely no context. i restarted this flow a minute later with no changes at all, and it ran successfully. are there any recommendations about handling these situations?
    k
    • 2
    • 8
  • b

    bral

    11/30/2021, 6:41 PM
    Hey! Is there a way to run async code (async def function) inside a prefect task? Should I create a loop and then pass this coro in loop.run_until_complete () inside @task?
    k
    • 2
    • 3
  • i

    Isaac Brodsky

    11/30/2021, 8:28 PM
    Ever seen this error in Kubernetes?
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"dask-root-elided\" is forbidden: User \"system:serviceaccount:default:default\" cannot get resource \"pods/log\" in API group \"\" in the namespace \"default\"","reason":"Forbidden","details":{"name":"dask-root-elided","kind":"pods"},"code":403}
    Was trying to install a new Prefect agent and it seems the RBAC setup is not right? The agent was configured with
    prefect agent kubernetes install --key $PREFECT_API_KEY --rbac --label my_label_here
    a
    • 2
    • 7
  • i

    Isobel Jones

    12/01/2021, 11:01 AM
    Hi, has anyone seen this error before on Kubernetes
    Failed to load and execute Flow's environment: AssertionError(None)
    . Trying to access private github repo for first time. Have set the github token. Running on gke
    flow.storage = GitHub(
            repo="Infrastructure/prefect",                           # name of repo
            path="_cfg/server/hello.py",                   # location of flow file in repo
            base_url="<https://github.private-repo.com/>"
        )
    we are setting the env variable which contains the token
    a
    l
    • 3
    • 21
  • a

    Aqib Fayyaz

    12/01/2021, 12:47 PM
    Hi, why prefect agent require role and rolebinding permissions when deploying agent on gke?
    a
    • 2
    • 1
  • a

    André Petersen

    12/01/2021, 1:55 PM
    Hi, can you recommend an article which covers the following aspects? 1. How to use Prefect without Prefect Cloud, if you still want a Web UI? 2. What are the alternatives to Prefect Cloud? 3. What are the advantages and disadvantages of those alternatives? 4. Probably too specific: (How to setup Prefect on AWS Fargate without Prefect Cloud, but with UI)
    a
    • 2
    • 5
  • w

    Wieger Opmeer

    12/01/2021, 2:39 PM
    In Prefect Cloud there are Cloud Hooks and Automations. What would be a typical use case for each of them? Or specifically: is notifying when a flow run fails best done with a hook or with a automation?
    a
    • 2
    • 2
  • l

    Leon Kozlowski

    12/01/2021, 2:49 PM
    Has anyone ever attached a service account to their agent with helm? I’m getting 403s after attaching a service account - including error message in thread
    k
    a
    t
    • 4
    • 12
  • d

    Daniel Suissa

    12/01/2021, 3:54 PM
    Hi all, I'm Daniel from Sentra ✌️We're psyched to leverage Prefect and this great community
    đź‘‹ 4
    k
    j
    +2
    • 5
    • 4
  • d

    Daniel Suissa

    12/01/2021, 3:57 PM
    The question that brought me here: how to create dynamic dependencies inside tasks? Seems like the Prefect way is to run subflows, but flows don't have return values that can be taken as parameter by other flows.. I'm trying to do something like: a = flow_a.run() b = flow_b.run(a) or alternatively call task from other tasks
    k
    • 2
    • 12
Powered by Linen
Title
d

Daniel Suissa

12/01/2021, 3:57 PM
The question that brought me here: how to create dynamic dependencies inside tasks? Seems like the Prefect way is to run subflows, but flows don't have return values that can be taken as parameter by other flows.. I'm trying to do something like: a = flow_a.run() b = flow_b.run(a) or alternatively call task from other tasks
Seems like in Orion this is straightforward btw, but we want to start with the production ready version
k

Kevin Kho

12/01/2021, 4:10 PM
So in Orion is was made straightforward because it was a pain point in current Prefect.
flow.run()
is not meant for production so you need to register the subflows and then trigger them with the
StartFlowRun
or
create_flow_run
task like this
Calling a task from another task can be done like this:
@task
def abc():
    return 1

@task
def bcd():
    abc.run()
but the
abc
in
bcd
is no longer a task. it’s like a regular Python function so you don’t get observability
d

Daniel Suissa

12/01/2021, 4:23 PM
thanks @Kevin Kho! Then I have the right understanding here.. is there an estimate on when Orion will be production ready?
It may be overly simple question but.. I'm wondering if I'm trying to force Prefect on my use case: Was Prefect designed to run deeply dependent dynamic pipelines? It seems like running a task in task or a flow in a flow are pretty obvious requirements of ETLs but maybe my sight is narrow here
k

Kevin Kho

12/01/2021, 4:26 PM
No public timeline for that, but a lot of current users do run this subflow pattern. Task in a task is a bit more vague. Could you give me an example of what you are thinking?
d

Daniel Suissa

12/01/2021, 4:29 PM
So in airflow I can declare that task A is dependent on task B. This tells the engine to run the B node before the A node. I do that on a task - I'd call that "deep dependency declaration". It seems like in Prefect I can only do that within a Flow so does that mean that I need to serialize the logic that creates the dependencies?
operator* not task sorry
k

Kevin Kho

12/01/2021, 4:34 PM
Ah ok so in Prefect upstream and downstream dependencies are on the Flow level because tasks can be reused with different dependencies each time. Example:
@task
def plus_one(x):
    return x+1

with Flow("name") as flow:
    a = plus_one(1)
    b = plus_one(a)
This will implicitly build the dependencies for you and pass the value of
a
in memory
You can also set it with:
with Flow(...) as flow:
    a = first_task()
    b = second_task()
    c = third_task(c_inputs, upstream_tasks=[a,b])
or
with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[b])
if there is no data dependency
d

Daniel Suissa

12/01/2021, 4:44 PM
Thanks again @Kevin Kho. Sounds good except the slightly awkward way to fetch subflow results. I think we can use this for now and migrate later. Looking forward to the public release date 🙂
k

Kevin Kho

12/01/2021, 4:45 PM
Yes for sure. In case you haven’t seen it, this blog post shows how to retrieve results.
🙌 1
View count: 4