https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • h

    Hedgar

    08/21/2022, 2:03 PM
    Is it possible to stick with prefect 1 or in the nearest future it would be deprecated for prefect 2?
    ✅ 1
    a
    • 2
    • 2
  • b

    Brad

    08/21/2022, 10:18 PM
    Hey team - I had a flow run kick off unexpectedly (Cron
    10 15 * * 1-5
    and kicked off on a Saturday). How can I debug this? More details inside
    ✅ 1
    a
    h
    • 3
    • 37
  • b

    Ben Muller

    08/22/2022, 2:41 AM
    Hey - prefect 1.0 question. Is there a way for me to do a
    try except
    over my entire flow ( many tasks included ) so that I can log this to a third party ? I know I can send slack notifications through the automations but I am looking for something more granular where I can share metadata about the flow and not have to query the GraphQlApi. I would like this to be part of the flow itself.
    a
    • 2
    • 1
  • p

    Priyank

    08/22/2022, 6:12 AM
    Hi, I am using graphql api to query prefect database but getting timeout exception.
    requests.exceptions.ReadTimeout: HTTPConnectionPool(host='localhost', port=4200): Read timed out. (read timeout=15)
    How can I change or configure this timeout ? Also we're running this query for our locally hosted prefect (prefect 1.0)
    def readDatabase(query: dict) -> None:
        client = prefect.Client()
        data = client.graphql(query)
    ✅ 1
    j
    • 2
    • 1
  • m

    Malavika S Menon

    08/22/2022, 10:38 AM
    Hey, I want to create a dynamic DAG (where flows are not directly written into the code using the @flow decorator) However, I need to set dependencies for these subflows so a proper DAG is created. But Prefect 2.0 doesn't seem to have any argument for setting the upstream dependencies for flows. What could be a possible workaround for this?
    ✅ 1
    a
    • 2
    • 6
  • s

    Suresh R

    08/22/2022, 11:44 AM
    Hi! I want to pass a custom variable for my state handler to use it, how can i do that?
    ✅ 1
    a
    • 2
    • 1
  • o

    Oscar Björhn

    08/22/2022, 11:47 AM
    The other day I got some advice to look at extra_loggers for handling logs from my packages, which worked great (thanks Anna)! Then I found logging.yml, which was even better. I had some issues getting Prefect to actually respect logging.yml when I finally deployed everything, and it appears that the problem is specifically with docker infra runs. If I set up an agent for local/process runs it's all good. Is anyone aware of a workaround for this issue, for example, can I put logging.yml somewhere in my docker images to have it get picked up?
    ✅ 1
    a
    m
    • 3
    • 6
  • j

    José Duarte

    08/22/2022, 2:02 PM
    Hey all, does Prefect 2 have flow concurrency limits? I’ve read about work queues but the description on Work Queue Concurrency isn’t particularly clear:
    Each work queue can optionally restrict concurrent runs of matching flows.
    Does this mean that if I have
    Flow_A
    and
    Flow_B
    running, they can run concurrently for any concurrency limit value?
    ✅ 1
    c
    • 2
    • 5
  • p

    Pim Claessens

    08/22/2022, 2:06 PM
    Hello, I have a question about prefect 1 (1.3). We are currently running a very outdated instance on our own server (0.15.9) and we are in progress of getting that to the cloud. We are now first trying with version 1.3. But when trying to run a flow I get this error: Failed to load and execute Flow’s environment: FlowStorageError(’An error occurred while unpickling the flow:\n AttributeError(“Can\’t get attribute \‘_make_function\’ on <module \‘cloudpickle.cloudpickle\’ from \‘/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle.py\’>“)\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - cloudpickle: (flow built with \‘2.1.0\’, currently running with \‘2.0.0\’)\n - prefect: (flow built with \‘1.3.0\’, currently running with \‘0.15.9\’)\n - python: (flow built with \‘3.10.4\’, currently running with \‘3.8.12\‘)’) I have created it with prefect 1.3 on my mac, so that should work right? But it still seems to somehow mix the two old version. Any idea of where I’m going wrong?
    ✅ 1
    k
    j
    • 3
    • 10
  • j

    José Duarte

    08/22/2022, 2:28 PM
    I have the following flow:
    @flow
    def test():
        print("hello world")
    And it will fail retro-actively from time to time:
    15:18:12.953 | INFO    | prefect.agent - Submitting flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087'
    15:18:13.097 | INFO    | prefect.infrastructure.process - Opening process 'dangerous-ostrich'...
    15:18:13.104 | INFO    | prefect.agent - Completed submission of flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087'
    15:18:30.922 | ERROR   | Flow run 'dangerous-ostrich' - Crash detected! Execution was interrupted by an unexpected exception.
    15:18:30.954 | INFO    | prefect.engine - Engine execution of flow run '8b6a74ba-6dbe-49bd-b303-d5e9c863a087' aborted by orchestrator: This run has already terminated.
    hello world
    15:18:35.274 | INFO    | prefect.infrastructure.process - Process 'dangerous-ostrich' exited cleanly.
    As you can see above, it completed successfully but at the same time it didn’t. Furthermore, the code clearly ran (
    hello world
    was printed), but it was still marked as an error. Shouldn’t the flow just be marked as completed?
    ✅ 1
    k
    r
    +2
    • 5
    • 12
  • j

    Justin Stanley

    08/22/2022, 2:32 PM
    How do you change your account username and email address in Prefect Cloud?
    ✅ 1
    j
    • 2
    • 1
  • a

    Andrew Richards

    08/22/2022, 3:03 PM
    I'm experiencing a strange issue with version 2.1.1 where a scheduled flow run will try to run a task on all three of my Kubernetes pods simultaneously. This results in one of the tasks succeeding and two of them failing. Each pod is running on its own node. I am using SequentialTaskRunner with Prefect Shell Task and Process Infrastructure. I do not want to use Kubernetes Infrastructure because I want control over using my own kubectl commands. I'd prefer to spin up Prefect Agent pods like I would with any other process I run in Kubernetes. When I manually run the flow through the UI, it works perfectly and only schedules the task to one pod in the cluster. When I auto-schedule the flow run, it attempts to run the task on all pods in the cluster simultaneously despite them running on their own nodes. The UI thinks it is only running one task which can be seen in the screenshots. Anyone know why this might be?
    ✅ 1
    j
    • 2
    • 2
  • s

    Sam Garvis

    08/22/2022, 3:56 PM
    Is there a way to declare docker image in the prefect deployment build command?
    ✅ 1
    c
    j
    n
    • 4
    • 7
  • m

    michael.urrutia

    08/22/2022, 4:25 PM
    Hi! I was curious if there was any good docs or known methods for creating rotating secrets, where behind the scenes we are updating the value of a secrets
    ✅ 1
    a
    • 2
    • 1
  • b

    Balázs Aszódi

    08/22/2022, 4:26 PM
    Hi all, I am trying to run a deployment locally on my M1 Mac. However, I am continuously getting FileNotFoundError: [Errno 2] No such file or directory error. I tried to look around for a solution in the channels and discourses but I haven’t found the same problem just some similar ones. I am trying to run with the prefect cli and I am in my flows folder.
    prefect deployment build mssql_dbx_flow.py:dbx_trigger_sync_flow -n airbyte-prefect-dbx-elt -t testing
    I have created an agent already and I’m using prefect 2.1.1. I’ve attached my orion settings, as well. Could you advise me on what am I missing?
    c
    • 2
    • 10
  • s

    Slackbot

    08/22/2022, 5:46 PM
    This message was deleted.
  • n

    Nova Westlake

    08/22/2022, 5:55 PM
    Quick question. I'm going to be building a complex data processing workflow in AWS and we plan to use our own Prefect installation for this. I'm trying to determine if it should use AWS EKS (kubernetes) or AWS ECS (elastic container service). We plan to dockerize most of our worker nodes and call those from Prefect (they'll be running on windows and need custom installed software often, not just raw python, so we need to use EC2 instances not Fargate). Would anyone have a recommendation for going with EKS or ECS for a use case like this? My assumption is that there will be more community support when using Prefect with EKS as that's a more commonly used open source solution? Just want to go with whatever makes our lives the easiest.
    c
    a
    • 3
    • 12
  • j

    Jack Chang

    08/22/2022, 6:01 PM
    Am I putting the default label flag in the wrong place? I am getting this error:
    TypeError: __init__() got an unexpected keyword argument 'add_default_labels'
    My configuration looks like this:
    # Configure a custom image
    f.run_config = DockerRun(
        labels=["grls"],
        add_default_labels=False,
    )
    ✅ 1
    m
    • 2
    • 2
  • a

    Anna Geller

    08/22/2022, 6:36 PM
    Q from @Chris Gunderson - Hi Prefect community - I'm having an issue with the parameters used in a task 🧵
    ✅ 1
    c
    • 2
    • 6
  • a

    Aaron Goebel

    08/22/2022, 7:00 PM
    is it possible to pass result data between prefect tasks that are run on different agents? Or does this require stashing the intermediate data in something like a file on S3
    r
    • 2
    • 2
  • m

    Michael Z

    08/22/2022, 7:24 PM
    Hello, quick question, I am using
    trigger=any_failed
    in my task. But I really only want to do something if the previous task has failed, and not consider all upstream tasks. Can someone point me in the right direction?
    ✅ 1
    r
    • 2
    • 3
  • w

    Wei Mei

    08/22/2022, 7:48 PM
    Hello. I tried to deploy this locally but get an exception.
    - name_input: field required
    I see that in the do not edit below line part of the deployment.yaml it has name_input set to required.
    from prefect import flow, task
    from prefect.tasks import task_input_hash
    from datetime import timedelta
    
    @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
    def hello_task(name_input):
        # Doing some work
        print(f"Saying hello {name_input}")
        return "hello " + name_input
    
    @flow
    def hello_flow(name_input):
        hello_task(name_input)
    
    if __name__ == "__main__":
        hello_flow("Marvin")
    ✅ 1
    j
    • 2
    • 11
  • h

    Hedgar

    08/22/2022, 8:04 PM
    Trying to get the hang of prefect 2 for future transition, however I have the following issues After running the file a had a runtime error:”” “Orion requires sqlite>=3.24.0 but we found version 3.22.0” Secondly how can I effect schedule from local file/environment
    r
    • 2
    • 1
  • j

    Josh Paulin

    08/22/2022, 8:10 PM
    Hello. Working on the migration to 2.0, I can’t seem to find any documentation for how to migrate Results storage. In 1.0 I had an S3 bucket with a short lifecycle policy to store results/cache data between runs, but I’m not seeing where I would set that up for 2.0.
    j
    • 2
    • 6
  • n

    Nathaniel Russell

    08/22/2022, 8:19 PM
    from prefect import task, Parameter, Flow
    from prefect.storage import S3
    import prefect
    
    
    @task
    def main_task():
        print("foobar")
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("foobar")
    
    
    storage = S3(bucket="prefect-task-behaviour", stored_as_script=True, local_script_path="main.py")
    
    def build_flow():
        with Flow("s3-to-prefect-flow", storage=storage) as building_flow:
            main_task()
        return building_flow
    
    
    print(build_flow())
    I have an S3 bucket with a copy of this code (named main.py) in it, yet I am not seeing it print out foobar, why is that?
    a
    • 2
    • 1
  • n

    Neil Natarajan

    08/22/2022, 8:21 PM
    In prefect 2.0 what is the migration path for running workflows on a simple interval schedule? I tried creating a deployment yaml and including a schedule like this
    schedule:
      interval: 75
      timezone:
    However the flow doesn't actually run and I see a bunch of
    Scheduled
    flow runs that eventually become
    Late
    j
    a
    • 3
    • 18
  • p

    Paco Ibañez

    08/22/2022, 9:41 PM
    Hello! I am trying to run a simple flow using a DockerContainer infrastructue and minio storage block. When the deployment is created, the flow is uploaded to minio but when I run the flow I get an error. Any ideas? Flow and error in thread
    ✅ 1
    a
    • 2
    • 11
  • m

    Matt Melgard

    08/22/2022, 10:39 PM
    Hey all, I’m kind of new of Prefect, and I have the Kubernetes agent installed and wired up to my Prefect cloud 2.0 tenant on a local cluster as a POC. Can anyone point me in the direction of a docs page that can get me started with triggering a flow on my k8s cluster using the Agent? I may be missing it but I haven’t been able to find anything in the docs that gives a succinct end to end example and this page seems to just give me back a 404. Do I just need to define a Deployment and specify that it be run in Kubernetes somehow?
    m
    • 2
    • 3
  • j

    Jai P

    08/22/2022, 11:51 PM
    hullo! i'm attempting to build a flow that kicks off subflows asynchronously in Prefect 2.0, and seeing an error that i'm a bit stumped on:
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
    details in the thread!
    r
    m
    • 3
    • 19
  • r

    Rio McMahon

    08/23/2022, 4:44 AM
    I am having an issue with the prefect client/accessing flow runs. When I run
    import prefect
    pc = prefect.get_client()
    await pc.read_flow_runs()
    it returns a cloudpickle error
    TypeError: __init__() missing 1 required positional argument: 'path'
    (see comments for full stack trace). Previously I was using python 3.7 and was getting another cloudpickle error about version 5 not working. Other prefect client functions work (including
    read_flow_run
    and
    read_flow_run_states
    ) version stuff:
    $ prefect version
    Version:             2.1.1
    API version:         0.8.0
    Python version:      3.9.12
    Git commit:          dc2ba222
    Built:               Thu, Aug 18, 2022 10:18 AM
    OS/Arch:             darwin/x86_64
    Profile:             goodkiwi
    Server type:         hosted
    
    $ python --version
    Python 3.9.12
    Any thoughts on how to fix?
    j
    • 2
    • 4
Powered by Linen
Title
r

Rio McMahon

08/23/2022, 4:44 AM
I am having an issue with the prefect client/accessing flow runs. When I run
import prefect
pc = prefect.get_client()
await pc.read_flow_runs()
it returns a cloudpickle error
TypeError: __init__() missing 1 required positional argument: 'path'
(see comments for full stack trace). Previously I was using python 3.7 and was getting another cloudpickle error about version 5 not working. Other prefect client functions work (including
read_flow_run
and
read_flow_run_states
) version stuff:
$ prefect version
Version:             2.1.1
API version:         0.8.0
Python version:      3.9.12
Git commit:          dc2ba222
Built:               Thu, Aug 18, 2022 10:18 AM
OS/Arch:             darwin/x86_64
Profile:             goodkiwi
Server type:         hosted

$ python --version
Python 3.9.12
Any thoughts on how to fix?
TypeError                                 Traceback (most recent call last)
File ~/opt/anaconda3/envs/goodkiwi_env/lib/python3.9/site-packages/IPython/core/formatters.py:707, in PlainTextFormatter.__call__(self, obj)
    700 stream = StringIO()
    701 printer = pretty.RepresentationPrinter(stream, self.verbose,
    702     self.max_width, self.newline,
    703     max_seq_length=self.max_seq_length,
    704     singleton_pprinters=self.singleton_printers,
    705     type_pprinters=self.type_printers,
    706     deferred_pprinters=self.deferred_printers)
--> 707 printer.pretty(obj)
    708 printer.flush()
    709 return stream.getvalue()

File ~/opt/anaconda3/envs/goodkiwi_env/lib/python3.9/site-packages/IPython/lib/pretty.py:393, in RepresentationPrinter.pretty(self, obj)
    390 for cls in _get_mro(obj_class):
    391     if cls in self.type_pprinters:
    392         # printer registered in self.type_pprinters
--> 393         return self.type_pprinters[cls](obj, self, cycle)
    394     else:
    395         # deferred printer
    396         printer = self._in_deferred_types(cls)

File ~/opt/anaconda3/envs/goodkiwi_env/lib/python3.9/site-packages/IPython/lib/pretty.py:640, in _seq_pprinter_factory.<locals>.inner(obj, p, cycle)
    638         p.text(',')
...
     63 @staticmethod
     64 def loads(blob: bytes) -> Any:
---> 65     return cloudpickle.loads(base64.decodebytes(blob))

TypeError: __init__() missing 1 required positional argument: 'path'
Additionally - any documentation you can point me to on how to authenticate using the REST API? I think for 1.0 bearer tokens worked based on stuff I’ve seen however I get 401 errors when I try stuff like:
import requests
import prefect
pc = prefect.get_client()

response = <http://requests.post|requests.post>(
    url=str(pc.api_url),
    json={'name':{"any_": ["flow_name"]}},
    headers = {"Authorization": "Bearer <token>"}
)
j

Joao Moniz

08/24/2022, 12:36 PM
I saw one example of an API call yesterday on Discourse: https://discourse.prefect.io/t/prefect-deployments-faq/1467#how-can-i-toggle-the-schedule-off-for-my-flow-deployment-pause-a-schedule-19
🙌 1
r

Rio McMahon

08/24/2022, 1:12 PM
Thanks @Joao Moniz
View count: 5