https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    CA Lee

    07/12/2021, 2:40 AM
    Hello, trying to find a better way to use parameters for control flow using a dictionary (code in thread)
    k
    10 replies · 2 participants
  • n

    Nishtha Varshney

    07/12/2021, 11:58 AM
    Hello i am working with a simple ml model and trying to use prefect ui to see the flow of it. I tried almost everything to debug it following the old threads related to pickling error, but nothing works. Can anyone please tell what can be done. Error - Unexpected error: TypeError("cannot pickle 'weakref' object") ``````
    k
    8 replies · 2 participants
  • o

    Oludayo

    07/12/2021, 6:38 PM
    Hi everyone. I started using prefect a couple of months ago and it's been a good ride so far. By default, if the run() method succeeds for a task, Prefect sets the state to Success and records any returned data. I would however like to disable this recording of returned data because of how much memory it uses up. The task i run returns data of about 130Mb and I have thousands of raw data to map using this task. I couldn't find information about this online. How do you suggest i proceed? I should also mention that I'm running the flow without prefect server. Thank you for your time.
    k
    7 replies · 2 participants
  • j

    Joseph Loss

    07/12/2021, 8:04 PM
    is there a way to report specific items on the slack hook automation? Like if I wanted to report "max date downloaded" for example. I have a special notification that will do this on fail signals, but part of that process is to actually raise the signal and call signals.FAIL(). Wondering if I could just do the opposite for success? Or would it end up being two separate slack messages, one being the automated slack flow success message, and the other being the custom signals.SUCCESS() ?
    n
    2 replies · 2 participants
  • b

    Blake List

    07/12/2021, 10:40 PM
    Hi there! I was wondering what is the best way to create a prefect task out of a function that is applied to each row of a dataframe. E.g. wrap something like
    df = df.apply(my_function, axis=1)
    with prefect. Thanks!
    n
    k
    6 replies · 3 participants
  • p

    Pedro Machado

    07/12/2021, 11:21 PM
    Hi there. I have a flow that will run on Kubernetes. I'd like to see log messages in near real time. Currently, the messages show at the end of the run? I wonder if this has to do with the stream not being flushed frequently. How can I flush the stream after calling
    <http://logger.info|logger.info>()
    ? Thanks!
    n
    6 replies · 2 participants
  • b

    Ben Muller

    07/12/2021, 11:58 PM
    Hey community, In the python api you have the ability to run a flow locally, that has a schedule by adding
    flow.run(run_on_schedule=False)
    . I can't find this option for the cli when running
    prefect run -p my_flow.py
    What am I missing?
    m
    m
    4 replies · 3 participants
  • b

    Ben Muller

    07/13/2021, 6:14 AM
    Hey Prefect, I am getting an error on my flow in prod of
    Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    This is from my S3Result object trying to persist the results to the bucket. Not sure why this is happening as my
    task-role
    has full S3 access and I have an identical set up in staging, yet I dont get these errors. Am I right in assuming that the S3Result just assumes the
    task-role
    ? Why would boto3 not be picking this up?
    ✅ 1
  • b

    Ben Muller

    07/13/2021, 6:19 AM
    The only other thing I can think of is if the bucket value parameter is fixed at register time and not at run time of the flow?
  • b

    Ben Muller

    07/13/2021, 6:37 AM
    and that was the answer.... good to know. Hope that helps someone else out!
    ✅ 2
  • m

    Marko Herkaliuk

    07/13/2021, 10:16 AM
    Hi, let’s say I’m using Kubernetes agent, at the moment there is a flow running. CI/CD at this moment re-deploy the agent, what will happen with the running flow?
    e
    k
    3 replies · 3 participants
  • l

    Laura Vaida

    07/13/2021, 10:23 AM
    Hi at all, does anybody how to trigger a specific task in a flow with a certain delay of some minutes? thanks :)
    ✅ 1
    k
    4 replies · 2 participants
  • h

    Hilary Roberts

    07/13/2021, 1:35 PM
    Hi everyone! @Aaron Prescott and I are trying to set up our first flows on Prefect cloud using an aws EKS agent. Right now we are getting stuck when trying to specify the storage in aws ECR. When I run
    storage = Docker(registry_url="https://<our account id>.<http://dkr.ecr.eu-west-1.amazonaws.com/data/prefect-application/%22|dkr.ecr.eu-west-1.amazonaws.com/data/prefect-application/">)
    storage.build()
    I get this error
    [2021-07-13 14:32:00+0100] INFO - prefect.Docker | Building the flow's Docker storage...
    invalid reference format
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/Users/hilaryroberts/.pyenv/versions/3.8.11/lib/python3.8/site-packages/prefect/storage/docker.py", line 303, in build
        self._build_image(push=push)
      File "/Users/hilaryroberts/.pyenv/versions/3.8.11/lib/python3.8/site-packages/prefect/storage/docker.py", line 369, in _build_image
        raise ValueError(
    ValueError: Your docker image failed to build!  Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
    It should be using the default image from the prefecthq dockerhub, so I wonder why it's failing to build. Anyone know what I might be doing wrong?
    ✅ 1
    k
    7 replies · 2 participants
  • a

    Ali Abbas Zaidi

    07/13/2021, 2:05 PM
    What's the recommended way to insert data into a postgresql table?
    k
    1 reply · 2 participants
  • d

    Dmitry

    07/13/2021, 2:13 PM
    Hello, everyone! I'm beginner in Prefect. I try to build flow which will be running some sql file from GitHub. Using Prefect Secrets I have connect between my Perfect Cloud and my GitHub repo. I try to recieve file this way:
    from prefect.storage.github import GitHub
    my_string = GitHub(access_token_secret="github", repo="my_repo", path="etl/query.sql")
    but I get only
    <class 'prefect.storage.github.GitHub'>
    I need to get this file like a string. How I can do it?
    k
    n
    5 replies · 3 participants
  • z

    Zach Schumacher

    07/13/2021, 2:33 PM
    is there a less hacky way to create dynamic defaults for parameters as opposed to doing something like this?
    @task(name="set-date-params", nout=2)
    def set_date_params(
            start_date: typing.Optional[str],
            end_date: typing.Optional[str]
    ) -> typing.Tuple[datetime.date, datetime.date]:
        """
        If start date and end date are explicitly set on the Flow, we simply construct date objects and return them.
    
        If start_date is not set, we default it to 7 days from the local context date of the flow.
    
        If end_date is not set, we default it to the local context date of the flow.
        """
        context_run_datetime = context.get("date")
        aware_utc_run_datetime = context_run_datetime.replace(tzinfo=utc)
        aware_et_run_datetime = aware_utc_run_datetime.astimezone(et)
    
        if not start_date:
            seven_days_ago = aware_et_run_datetime - datetime.timedelta(days=7)
            start_date = seven_days_ago.date().isoformat()
    
        if not end_date:
            end_date = aware_et_run_datetime.date().isoformat()
    
        <http://logger.info|logger.info>(f"start_date={start_date}, end_date={end_date}")
        return datetime.date.fromisoformat(start_date), datetime.date.fromisoformat(end_date)
    
    
    with flow:
        start_date_, end_date_ = Parameter("start-date", default=None), Parameter("end-date", default=None)
        start_date_, end_date_ = set_date_params(start_date_, end_date_)
    k
    2 replies · 2 participants
  • t

    Thomas Opsomer

    07/13/2021, 2:41 PM
    Hi community 🙂, I’m having a strange bug with my flows on k8s: When a task fails, the flow goes into a failed state but I don’t see the error in the logs only some pod error message and a heartbeat related failure:
    Pod prefect-job-9f127532-nnd2f failed.
    	Container 'flow' state: terminated
    		Exit Code:: 1
    		Reason: Error
    
    No heartbeat detected from the remote task; marking the run as failed.
    But when I look at the pod logs, I can see the real error. In addition to that, we have no slack notification when the flow fails like that. Any idea how to overcome this ?
    k
    m
    11 replies · 3 participants
  • d

    domagoj jercic

    07/13/2021, 2:50 PM
    Hi everyone, I have a common set of concerns (configuration, logging) I want to setup for each task and my first idea was to override the .run() method of the prefect.tasks.core.function.FunctionTask and use prefect @task decorator with the new FunctionTask, but this method is not being called in the runtime when executing a task? Is it possible to add some cross cutting concern code before running a task itself? Tried a decorator, but seems it is not pickable by cloudpickle. I'll add a code example in the comment
    k
    14 replies · 2 participants
  • m

    Marie

    07/13/2021, 2:55 PM
    Hi! Whenever the instance running prefect tasks crashes, hearbeats are not detected anymore and the run should fail. Instead of this expected behavior only the task currently running fails, the next ones are still
    Pending
    and the run shows as
    In progress
    until I manually cancel it. Did anyone already run into this issue?
    No heartbeat detected from the remote task; marking the run as failed.
    k
    14 replies · 2 participants
  • b

    Bruno Murino

    07/13/2021, 2:58 PM
    Hi everyone — is there a way to register a flow with Docker storage from within a docker container? I’m thinking if there’s something clever I can do but not sure
    k
    d
    +2
    13 replies · 5 participants
  • s

    Sarita Patel

    07/13/2021, 4:04 PM
    Hello All, I am trying to register my flow and getting an error "ValueError: Project prod not found. Run
    prefect create project 'prod'
    to create it."
    k
    8 replies · 2 participants
  • j

    Joseph Loss

    07/13/2021, 5:05 PM
    Hey all, has anyone had any experience in using Prefect @task decorator in a python package that can be imported and used in flow scripts? The use case here would be a common database connection function, that inside the function would use the Prefect AWS Task to connect and pull credential information from AWS Secrets Manager
    n
    k
    6 replies · 3 participants
  • h

    Hugo Kitano

    07/13/2021, 5:14 PM
    Hi, interested in using the GraphQL API to set off a flow run with parameters. If the following is the query in the post request to start off a flow run, how would I add in any parameters I’d like to specify? Thanks!
    query = """mutation {
        create_flow_run(input: { 
        flow_id: "5ac80adc-442a-4e53-bc01-d9d65bffd6aa"}) {
            id
        }
        }"""
    n
    k
    21 replies · 3 participants
  • j

    Joseph Loss

    07/13/2021, 5:50 PM
    Hey all, We have the docker image stored on a private AWS ECR. This seems to stop working after 12 hours of starting the agent, because the authorization token expires and would somehow need to be re-authenticated with aws ecr docker login command in the command line. Otherwise, the docker agent can't pull the latest image on flow run. I've tried passing in the environment variable "PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS"_ on starting the agent, but that still doesn't seem to solve the issue. Any ideas on how to solve this? NOTE: AWS Authorization Tokens can't be changed from the 12 hour expiration. So If anyone has a way that they solved this for the prefect docker agent, that would be very helpful
    b
    5 replies · 2 participants
  • b

    Bruno Murino

    07/13/2021, 8:08 PM
    Hi everyone I just deployed a flow with Docker storage and ECS Agent, and on my dockerfile I do:
    ENTRYPOINT ["make", "deploy"]
    and when I ran the flow no logs came to the UI at all (as expected, I think), because the container threw an error with “make” (probably because directory config and etc so no worries about it) so the flow is still in the “submitted” state, even though the actual ECS task has already failed. Is there any way to have better visibility of when that happens? in case a dev forgets to remove the entrypoint from the dockerfile..
    n
    m
    +1
    5 replies · 4 participants
  • k

    Kathryn Klarich

    07/13/2021, 8:46 PM
    Not sure if this is the right spot for this but it would be really helpful if the aws BatchSubmit task printed the error message on this line rather than just catching it
    n
    a
    7 replies · 3 participants
  • v

    Vincent

    07/13/2021, 9:08 PM
    Hello - I was wondering if there is a way to prevent DockerStorage from being built / pushed when the Flow is already registered with a known idempotency_key. Ideally, the storage should only be built/pushed when the Flow has changed.
    k
    7 replies · 2 participants
  • e

    Enda Peng

    07/14/2021, 2:43 AM
    Does anyone know about this issue?
    prefect backend server
    Traceback (most recent call last):
      xxxxx
      File "pyev/lib/python3.6/site-packages/prefect/utilities/notifications/__init__.py", line 1, in <module>
        from prefect.utilities.notifications.notifications import callback_factory
      File "pyev/lib/python3.6/site-packages/prefect/utilities/notifications/notifications.py", line 13, in <module>
        from toolz import curry
    ModuleNotFoundError: No module named 'toolz'
    k
    2 replies · 2 participants
  • b

    Blake List

    07/14/2021, 3:10 AM
    Hi there, does anyone have an example using greatexpectations tasks? Thanks!
    k
    1 reply · 2 participants
  • b

    Bruno Murino

    07/14/2021, 9:42 AM
    Hi everyone — I’m using Docker storage but I want to register the same flow against 2 Prefect Servers (one is PROD the other is UAT and we have some other as well). All docker images are stored in the same place so we don’t need to build and push the docker image all the time. Is there a way to re-use the docker storage to deploy to many servers? I tried just “not building” because it was already built but I got the error:
    Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
    k
    3 replies · 2 participants
Powered by Linen
Title
b

Bruno Murino

07/14/2021, 9:42 AM
Hi everyone — I’m using Docker storage but I want to register the same flow against 2 Prefect Servers (one is PROD the other is UAT and we have some other as well). All docker images are stored in the same place so we don’t need to build and push the docker image all the time. Is there a way to re-use the docker storage to deploy to many servers? I tried just “not building” because it was already built but I got the error:
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
k

Kevin Kho

07/14/2021, 2:21 PM
Hey @Bruno Murino, I think the answer here is no, but you could try another Storage combined with DockerRun and that wouldn’t require the re-build of the container each time you register. The DockerRun will pull the image, and then run the Flow on top of it. This way, you only need to store the Docker container once.
b

Bruno Murino

07/14/2021, 2:27 PM
I’m using an ECS Agent — what other storage can I use? I thought I could only use docker storage..
k

Kevin Kho

07/14/2021, 2:30 PM
The
ECSRun
takes in an image , so you could provide the image this way. And then the flow could live on S3, so you can use S3 Storage + ECSRun. The flow will be downloaded from S# and run on the specified image.
View count: 1