https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Stéphan Taljaard

    06/18/2021, 1:16 PM
    Prefect Schedules + Filters: I have two schedules on the same flow: • Pull a small chunk of data every 30 minutes • Pull a large chunk data once per day at 05:30 On 05:30, two runs will be scheduled - one for the small chunk and one for the large chunk. However, since there is a small overlap in time, duplicate data will be pulled. Is there a way to edit the following schedule
    every_30_min = IntervalClock(interval=timedelta(minutes=30), parameter_defaults={"Start date hours delta": 0.75})
    daily_schedule_time = pendulum.time(5, 30)
    every_day_0530 = IntervalClock(
        start_date=pendulum.today(tz=TIMEZONE).at(daily_schedule_time.hour, daily_schedule_time.minute),
        interval=timedelta(days=1),
        parameter_defaults={"Start DateTime": "yesterday"},
    )
    
    schedule = Schedule(clocks=[every_30_min, every_day_0530])
    to filter as follows: • Pull a large chunk data once per day at 05:30 • Pull a small chunk of data every 30 minutes (but not on 05:30) I think I need a
    no_filter
    , however it seems that applies to all clocks. It will thus remove both my scheduled runs, leaving only the 30minute one?
    k
    • 2
    • 2
  • p

    Prabin Mehta

    06/18/2021, 2:16 PM
    Hi Everyone. I am running prefect in a Kubernetes cluster. I am using Dask as executor and for the run environment using Kubernetes Run. I am using the cloud and in the trial. What I found is, I have a lag between 4-65 sec between job scheduled and job running. Is there any way we can tune the scheduler or the worker for running with minimum lags? Let me know if I have to check some config for this.
    k
    z
    • 3
    • 5
  • m

    Marko Jamedzija

    06/18/2021, 3:40 PM
    Hello 🙂 I’ve been searching through these channels but couldn’t find the answer to this question. I’m using
    RunNamespacedJob
    task to run k8s jobs. When they fail they don’t get deleted, which is fine and described in the docstring
    If job is in the failed status, resources will not be removed from the cluster so that user can check the logs on the cluster.
    . The problem appears the next time I run a job because it complains that the job with the same name already exists and I don’t want to delete the job manually every time it happens. What would be the ideal solution to go around this? Thanks!
    m
    j
    • 3
    • 6
  • m

    matta

    06/18/2021, 6:15 PM
    Current gig officially made a Prefect account! To get started (on GCP), I'd just be using it to schedule Cloud Functions and Cloud Run stuff we already wrote - would I be okay deploying the server (where the Agents and stuff run) to a Compute instance and running things locally in that case? Cuz each Flow would just be calling a Cloud Function. Or should I dive straight into GKE? Thanks!
    k
    • 2
    • 6
  • b

    Berty

    06/18/2021, 6:39 PM
    Can someone suggest a best practice for mapping a large set to a task? My flow is failing with the error below. I'm just using a local dask executor in a very simple single task flow. I don't really understand this warning and it's unclear how to use something like
    client.scatter
    ?
    python3.8/site-packages/distributed/worker.py:3373: UserWarning: Large object of size 126.73 MB detected in task graph: 
      {'task': <Task: blah>, 'state': None, 'ups ... _parent': True}
    Consider scattering large objects ahead of time
    with client.scatter to reduce scheduler burden and 
    keep data on workers
    
        future = client.submit(func, big_data)    # bad
    
        big_future = client.scatter(big_data)     # good
        future = client.submit(func, big_future)  # good
      warnings.warn(
      
    Killed
    
    python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 48 leaked semaphore objects to clean up at shutdown
      warnings.warn('resource_tracker: There appear to be %d '
    k
    • 2
    • 10
  • l

    Leon Kozlowski

    06/18/2021, 6:41 PM
    Hi all - I found an older thread relating deploying a Kubernetes Agent by way of terraform, the example is seen here — I was wondering if anyone has had success with this deployment pattern? My use case would not be with AKS but rather with EKS on Fargate.
    m
    • 2
    • 1
  • b

    Ben Muller

    06/18/2021, 8:05 PM
    Do prefect have a recommended instance type for the agent in AWS? Is a t2.micro big enough? It's just listening for jobs and sending off task definitions, right? That shouldn't require much computational power...
    k
    m
    +2
    • 5
    • 11
  • m

    matta

    06/18/2021, 8:20 PM
    Where will
    ~/.prefect/config.toml
    be if I installed with conda?
    m
    • 2
    • 6
  • m

    matta

    06/18/2021, 9:13 PM
    For Authentication for running a GCP Cloud Function, can I just put the Key for the GCP Service Account that runs the function into a Prefect Secret?
    k
    m
    • 3
    • 12
  • a

    Andrew Hannigan

    06/19/2021, 12:51 AM
    I'm getting an error where a downstream task has a TriggerFailed error but the upstream task that it depends on is pending, not failed. This is the error message I am seeing: "Trigger was "all_successful" but some of the upstream tasks failed." Any ideas?
    k
    • 2
    • 3
  • g

    Gopinath Jaganmohan

    06/19/2021, 2:15 PM
    Hi Prefect Team, We are Saas Platform company and would like to use OpenSource Version of Perfect. As Prefect Core is Apache License we should be good to go. I would like to get some clarification on Prefect Server and UI. How we will use Prefect in our SaaS Platform? We are planning to use it for DataScience PipeLine , our user will not directly use the prefect UI or Server, which will be internal to our company. Our Users will register the flow from JupyterNote book. Will the Prefect Community License allow this use case or not?
    k
    t
    • 3
    • 7
  • a

    Ayyanar Thangaraj

    06/21/2021, 3:24 AM
    (prefect-env) root@ubuntu:/venv/prefect-env/flows# python email.py Traceback (most recent call last): File "email.py", line 1, in <module> from prefect import task, Flow File "/venv/prefect-env/lib/python3.8/site-packages/prefect/__init__.py", line 1, in <module> import prefect.utilities File "/venv/prefect-env/lib/python3.8/site-packages/prefect/utilities/__init__.py", line 9, in <module> import prefect.utilities.notifications File "/venv/prefect-env/lib/python3.8/site-packages/prefect/utilities/notifications/__init__.py", line 1, in <module> from prefect.utilities.notifications.notifications import callback_factory File "/venv/prefect-env/lib/python3.8/site-packages/prefect/utilities/notifications/notifications.py", line 7, in <module> import smtplib File "/usr/lib/python3.8/smtplib.py", line 47, in <module> import email.utils File "/venv/prefect-env/flows/email.py", line 1, in <module> from prefect import task, Flow ImportError: cannot import name 'task' from partially initialized module 'prefect' (most likely due to a circular import) (/venv/prefect-env/lib/python3.8/site-packages/prefect/__init__.py) (prefect-env) root@ubuntu:/venv/prefect-env/flows# cat email.py from prefect import task, Flow from prefect.tasks.notifications.email_task import EmailTask @task(name="Task A") def send_email(): EmailTask( subject="Test Prefect EmailTask", msg="Hello success", email_to="email@gmail.com" ) with Flow("Email Task example") as flow: send_email() flow.register(project_name="test") (prefect-env) root@ubuntu:/venv/prefect-env/flows#
    k
    • 2
    • 8
  • a

    Ayyanar Thangaraj

    06/21/2021, 3:25 AM
    can any one faced this issue. Please help me on this.
  • a

    Ayyanar Thangaraj

    06/21/2021, 3:25 AM
    I was tired with 2 environments facing same issue on both environment.
  • a

    Ayyanar Thangaraj

    06/21/2021, 3:26 AM
    Please help me on this thanks in advance.
  • n

    Noah Holm

    06/21/2021, 7:02 AM
    Is restarting a failed flow that essentially needs the whole flow to rerun not supported? I have a couple of tasks where dependencies are managed through passing data with the functional API as well as manually setting
    upstream_tasks=[some_task]
    for those that just need to run in order. When the last one fails it’ll only retry the failed one and not the ones that it depends on. I use S3 storage, therefore S3 results but I have added
    checkpoint=False
    to all my tasks in the flow. When restarting in the Cloud UI I get a message saying that “restarting tasks may require more config, read docs”, but I don’t see where I’d solve my use case. I would expect that the tasks that have dependent tasks in the flow gets rerun since they didn’t have any checkpoint of their results.
    k
    • 2
    • 3
  • t

    Thomas Hoeck

    06/21/2021, 11:00 AM
    Hi, Can anyone help with making a GraphQL call to start a flow (create a flow run) and get the id of the flow run back (so I can monitor the state)?
    r
    k
    • 3
    • 6
  • o

    Omar Sultan

    06/21/2021, 11:30 AM
    Hi Guys, wondering if anyone did work with Spark on Prefect. Is there a way to pass spark dataframes between two tasks? I am trying to run Spark in client mode and pass the dataframes between tasks.
    z
    k
    • 3
    • 10
  • t

    Toni Vlaic

    06/21/2021, 11:50 AM
    Hello Everyone, I am trying to create a flow that triggers other flows using this as a starting point. All flows are executed in dockers but when the parent flow triggers child flows I get a failure with this stack trace:
    Unexpected error: ClientError([{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 449, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 172, in run
        run_link = client.get_cloud_url("flow-run", flow_run_id)
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 895, in get_cloud_url
        tenant_slug = self.get_default_tenant_slug(as_user=as_user and using_cloud_api)
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 928, in get_default_tenant_slug
        res = self.graphql(query)
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 318, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]
    Any ideas on how to approach to troubleshoot this?
    k
    • 2
    • 8
  • s

    Shivam Shrey

    06/21/2021, 12:38 PM
    Hi, Can we access the flow-run-id inside the flow when it's running? I mean, something like
    this.id
    in the currently running flow.
    d
    p
    • 3
    • 4
  • g

    Gitesh Shinde

    06/21/2021, 1:51 PM
    How to set docker image in
    KubernetesRun
    to be pulled from aws ecr?
    c
    t
    • 3
    • 9
  • c

    ciaran

    06/21/2021, 3:01 PM
    New GitHub iconography -> New Prefect iconography?! 🕵️
    myth busted
    😆 4
  • a

    Alexander

    06/21/2021, 5:36 PM
    We have 65 flows registered. If i want to setup cloud hooks, i need manually set cloud hook for every 65 flow?
    k
    z
    • 3
    • 2
  • j

    Joe

    06/21/2021, 5:43 PM
    Howdy folks - anyone hit this one before?
    [21 June 2021 1:41pm]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/root/'")
    k
    • 2
    • 9
  • j

    Joseph Loss

    06/21/2021, 6:54 PM
    Hey everyone, qq. When using CodeCommit as your flow storage, is there a way to pass in the credentials / region name for the boto3 authentication via PrefectSecret() ?
    z
    • 2
    • 16
  • l

    Leon Kozlowski

    06/21/2021, 7:46 PM
    Is there anyway to see which agent is currently running a task (specifically a label)? Use case I am looking to handle is to select Secrets based on agent label/environment (dev vs. prod)
    k
    m
    • 3
    • 4
  • m

    matta

    06/21/2021, 10:49 PM
    #RubberDucking So I've got my Agents running on a small server, mostly triggering Google Cloud Functions and Cloud Run jobs. Trying to think of a CI/CD process that'll make it easy to register flows. There's probably a GitHub Action that automatically uploads new commits to a server? And then have the scripts call
    flow.register()
    at the bottom. probably a way to have it run
    flow.py
    upon uploading a new version?
    m
    s
    • 3
    • 12
  • b

    Ben Muller

    06/21/2021, 11:53 PM
    Hey prefect devs, I have a build step that is setting kv's via
    prefect kv set
    in a Docker image. Is the only thing I need to do so that the container is authed with my cloud account ....
    export PREFECT__CLOUD__AGENT__AUTH_TOKEN=foobar
    I am getting the error:
    prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
    z
    • 2
    • 2
  • s

    Sam Gibson

    06/22/2021, 4:40 AM
    Hi everyone. Very new to prefect, and just starting to evaluate how it might be able to be a replacement for our internally built tooling. There are lots of really good things that we see in the design, but it's different enough to our current approach that we're struggling to understand some usage. One big conceptual challenge is that in our current implementation the DAG is created at declaration-time by saying what your dependencies are, e.g.
    @task()
    def a():
        # do stuff...
    
    @task()
    def b():
        # do stuff...
    
    @task(requires=[a, b])
    def c(a, b):
        # do stuff...
    The benefit of such an approach is that is that what your dependencies is defined in the same place you need them (instead of at the bottom of the file in a flow block??). This is especially useful when your dependencies involve making queries to various data sources and you parameterise the queries, e.g.
    @task(requires=[
        query_table("trades"),
        query_table("orders")
    ])
    def targets(trades, orders):
        # do stuff...
    In the prefect model, from what I gather, for the above example I would instead write something like...
    @task
    def targets(trades, orders):
        # do stuff...
    
    # maybe lots of other code goes here...
    
    with Flow("targeting") as flow:
        trades = query_table("trades")(**params)
        orders = query_table("orders")(**params)
    
        targets(trades, orders)
    From a code author point of view the prefect model is (subjectively, I admit) awkward, especially as the complexity of your piplines grow (with tasks depending on tasks, depending on queries, etc). I've played around with implementing some DSL on top of prefect to achieve something similar to the above, but it feels like I'm really trying to pound a square peg into a round hole. What I'd like to understand is what patterns or idioms are used for complicated pipelines, especially those spanning multiple modules/files, with multiple flows depending on one another? Are there advantages to declaring the DAG and your logic separately that I'm not understanding? Especially for pipelines that are built where a "start" and "end" time are implicit in every task while operating on time series data.
    k
    • 2
    • 11
  • p

    Prabin Mehta

    06/22/2021, 9:21 AM
    Hi Everyone, I am running prefect using kubernetes Run with Dask executor. I am passing the ecr repo for image in kubernetesRun function. Some of my flows are running perfectly while some give an error for not able to get secrets . And for flows which are running they too have same secrets to import. let me know any suggestions or help.
    File "/usr/local/lib/python3.8/site-packages/prefect/client/secrets.py", line 137, in get
    value = secrets[self.name]
    c
    • 2
    • 7
Powered by Linen
Title
p

Prabin Mehta

06/22/2021, 9:21 AM
Hi Everyone, I am running prefect using kubernetes Run with Dask executor. I am passing the ecr repo for image in kubernetesRun function. Some of my flows are running perfectly while some give an error for not able to get secrets . And for flows which are running they too have same secrets to import. let me know any suggestions or help.
File "/usr/local/lib/python3.8/site-packages/prefect/client/secrets.py", line 137, in get
value = secrets[self.name]
c

ciaran

06/22/2021, 9:46 AM
@Prabin Mehta Do you have an example of your flow? Or a cut down version that can re-produce this error?
p

Prabin Mehta

06/22/2021, 11:03 AM
@ciaran Below are my flows, this flow connect to snowflake and create a schema. 1. This is the flow where I am passing the image and extra packages. This is running without an issue.
with Flow(name="schema-generator",state_handlers=[handler],
          storage=GitHub(repo="prefect-flows",
                         path="flows/schema_generator.py",access_token_secret="GITHUB_ACCESS_TOKEN"),
          run_config=KubernetesRun(image="prefecthq/prefect:0.14.22-python3.8",
                                env={"EXTRA_PIP_PACKAGES": "msgpack==1.0.0 lz4==3.1.1 numpy==1.18.1 prefect[snowflake]"}),
          executor=DaskExecutor("<tcp://dask-scheduler:8786>"),
          result=PrefectResult()) as flow_schema_generator:
    schema = Parameter("schema", default=[])
    _create_schema(schema=schema)
2. This is the flow where I am passing ecr repo which have the prefect image installed with all dependencies. This is giving me the above mentioned error in job pods.
with Flow(name="schema-generator",state_handlers=[handler],
          storage=GitHub(repo="prefect-flows",
                         path="flows/schema_generator.py",access_token_secret="GITHUB_ACCESS_TOKEN"),
          run_config=KubernetesRun(image="ecr_link/prefect-aura-image:latest",
                                   image_pull_secrets=["secret"]),
          executor=DaskExecutor("<tcp://dask-scheduler:8786>"),
          result=PrefectResult()) as flow_schema_generator:
    schema = Parameter("schema", default=[])
    _create_schema(schema=schema)
c

ciaran

06/22/2021, 11:32 AM
@Prabin Mehta have you seen this thread? https://prefect-community.slack.com/archives/CL09KU1K7/p1624284251299900?thread_ts=1624283518.297500&amp;cid=CL09KU1K7 @Tyler Wanner may have outlined what you need to do here.
p

Prabin Mehta

06/22/2021, 12:21 PM
Have used this to use ecr repo.
c

ciaran

06/22/2021, 12:27 PM
Did it work?
p

Prabin Mehta

06/22/2021, 12:33 PM
Thanks, It worked. Had to update the docker image with all dependencies and then while creating secret has to pass the full URL of the docker image to the docker-server parameter.
✅ 2
c

ciaran

06/22/2021, 12:35 PM
Glad it worked out!
👍 2
View count: 1