https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Cliff Wells

    07/20/2021, 8:34 PM
    so I didn't ask to join this community, and I'm pretty annoyed that Prefect just added me without my permission.
    j
    • 2
    • 2
  • j

    Justin Liu

    07/20/2021, 8:41 PM
    Hello all, we’re trying to migrate to Git storage from GitHub in order to load multiple files in a single flow, but can’t get it to work with even just one file. This error pops up every time.
    Failed to load and execute Flow's environment: ValidationError({'type': ['Unsupported value: Git']})
    . This is our code, works with GitHub storage
    import prefect
    from prefect.storage import Git
    from prefect.run_configs import ECSRun, LocalRun
    from prefect import task, Flow, Parameter
    from prefect.client import Secret
    
    RUN_CONFIG = ECSRun(image='image/image',
                        cpu='1 vcpu', memory='2 GB')
                      
    STORAGE = Git(repo='name/repo', flow_path='path_to_this_file', git_token_secret_name='token')
    
    @task
    def say_hello():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hi")
        
    with Flow("git-storage", storage=STORAGE, run_config=RUN_CONFIG) as flow:
        say_hello()
    k
    • 2
    • 46
  • b

    Ben Muller

    07/20/2021, 9:32 PM
    This might be a silly question, but is there a way to register the same flow with different parameters? I have 3 very similar flows that need to continually run but their only difference would be a few parameters and I can't see how to do this without rewriting all three flows?
    k
    m
    • 3
    • 11
  • e

    Eric Mauser

    07/20/2021, 10:04 PM
    Okay, it seems there is a gap in my Prefect understanding. I've got a docker image with a flow inside that works locally. I'd like to run it with the ECSAgent. I have the ECS agent set up in AWS. What is the process for actually deploying my flow now?
    k
    z
    • 3
    • 9
  • h

    Hugo Kitano

    07/20/2021, 11:14 PM
    Earlier, I was having trouble running an ecs agent and was advised to invoke the command
    prefect agent ecs start
    with the parameter
    --run-task-kwargs config.yaml
    where config.yaml contains the following:
    networkConfiguration:
      awsvpcConfiguration:
        subnets:
          - 'subnet-something'
        assignPublicIp: 'ENABLED'
    I would instead like to be able to input this as a command-line argument. Will there be any fixes down the road?
    k
    • 2
    • 12
  • b

    Blake List

    07/21/2021, 12:40 AM
    Hi there, when caching data with a function that maps over rows in a dataframe, only the first row will be stored. Is there a way to cache all the mapped rows?
    k
    • 2
    • 7
  • k

    Karlygash Mukhitova

    07/21/2021, 7:29 AM
    Hello, all! Has anyone had an experience scheduling KNIME workflow using prefect?
    j
    k
    • 3
    • 3
  • g

    Goh Rui Zhi

    07/21/2021, 8:57 AM
    Hello! I was trying to migrate my legacy data pipeline into a prefect flow, by refactoring the
    main()
    of each job into a task. This runs fine, but the problem is that the logs for the functions outside of each the
    main()
    method will not be logged using
    prefect.context
    and hence not appear on the prefect UI. Is there a way for Prefect to capture all logs without explicitly changing the logger for each of the sub functions to use that of
    prefect.context
    ? I know that making each job into a subprocess and then redirecting the stdout into Prefect's logger will work, but would like to avoid using subprocesses in general. Besides, this method also seems very hackish haha.
    a
    k
    • 3
    • 10
  • d

    Dragan

    07/21/2021, 10:03 AM
    Hi there, could you share what you use (or what are some of the best tools to use) for tracking execution of jobs in Prefect? We can use UI to check every flow, but if our flow consist of for instance 20 parts (sql queries, etc…) do you use something to track execution time. I know we can use DB to check query execution, but is there some tool recommended by Prefect for these types of tracking? That tool in dream world would show execution for every part of the flow so we can check over time what is going on or which jobs are problematic
    d
    • 2
    • 2
  • d

    Dotan Asselmann

    07/21/2021, 11:17 AM
    hey guys. i’m running prefect-server on my k8s cluster. is it possible to have replications of apollo, graphql, and hasura services?
    k
    • 2
    • 2
  • r

    Ranu Goldan

    07/21/2021, 11:30 AM
    Hi everyone, I have encountered this error recently.
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 4b5a98d6-b0c0-4dc5-b540-7f00b17cea2d: provided a running state but associated flow run fc3bdb79-c506-4710-87c1-c039f29d727a is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 95, in call_runner_target_handlers
        cache_for=self.task.cache_for,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1526, in set_task_run_state
        version=version,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 319, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 4b5a98d6-b0c0-4dc5-b540-7f00b17cea2d: provided a running state but associated flow run fc3bdb79-c506-4710-87c1-c039f29d727a is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    It happens when task raised
    FAIL()
    and retries. and the the task will be marked
    ClientFailed
    state, then the flowrun is
    Cancelled
    What I don't understand is why suddenly the error says flowrun not in a running state? How to resolve this?
    k
    • 2
    • 7
  • b

    Brett Naul

    07/21/2021, 1:11 PM
    my team has increasingly had trouble accessing cloud.prefect.io (just an endless spinner after logging in), to the point where someone asks daily "is prefect down for everyone?". it seems like the problem might cache-related because often opening in another browser will do the trick, but clearing the cache in Chrome doesn't seem to be enough. in the console I see an endless looping request to
    <http://lr-ingest.io|lr-ingest.io>
    , not sure if that's a symptom or a cause. any suggestions for how to further debug? I'm staving off an anti-Prefect mutiny from my team because of how painful it is for anyone to access the site 🏴‍☠
    m
    s
    m
    • 4
    • 4
  • o

    Oliver Götze

    07/21/2021, 1:34 PM
    Hi all, I am encountering a PayLoadTooLargeError from Apollo when logging a large amount of messages in a short period of time:
    apollo_1    | PayloadTooLargeError: request entity too large
    (full traceback in the thread) When this error is thrown, no further log entries will be written for that particular flow run. Logging continues however for other flow runs. I've been able to mitigate this problem by decreasing the number of log messages, i.e. merging similar log messages into a single entry before sending them to logging. Nevertheless I was wondering if there's a way to make the logging more resilient? Maybe Prefect Cloud performs better in such a scenario?
    k
    f
    • 3
    • 9
  • b

    Bruno Murino

    07/21/2021, 1:34 PM
    Hi everyone — I have a mapped task and in case one of the “entries” fail, I want my handler to know what were the arguments for the one that failed, is there any way to get that? I already get the error message for each mapped task via the “map_states” attribute
    k
    • 2
    • 2
  • p

    Peter Peter

    07/21/2021, 1:54 PM
    Hello, we are having some issues with Logging on k8s and Dask losing some messages. This is even occurring on a simple flow. Odd thing is that if I put a small delay(5sec) after what should be logged it appears in the log and anything before. Has anyone else noticed this?   Sample task will not print a message to the log:  
    @task(log_stdout=True)
    def print_msg():
        print("Will it work?")
      Sample task will print message to the log:  
    @task(log_stdout=True)
    def print_msg():
        print("Will it work? Yup!")
        time.sleep(5)
      Any help appreciated,   P
    k
    m
    t
    • 4
    • 53
  • r

    Rinze

    07/21/2021, 2:56 PM
    Hi everyone, I keep running into issues trying to load my own 
    config.toml
     . I've tried many things already so I'd your input since I might be overlooking something. To keep everything together, I prefer to have the 
    config.toml
     in the same git folder as my flows ( (
    base/src/config.toml
     ). In 
    base/.env
      there are a bunch of environment variables, among others 
    PREFECT__USER_CONFIG_PATH
     . Initially this worked on my local machine and the things defined in the config file were used in the flow, both on Server and when I switched to Cloud. But now I cloned the repo to a different machine and it fails to import the config.toml. Strangely enough, when I output the value of  
    PREFECT__USER_CONFIG_PATH
     to the log, it is correct. More info in the thread. Is there a better way to do this, or am I forgetting something?
    k
    e
    • 3
    • 29
  • e

    Elliot Oram

    07/21/2021, 3:53 PM
    Hi all, looking for some advice about logging with prefect and threads (note: I'm no expert on any of these 3 concepts!): I have a prefect flow task that runs
    main()
    in the attached code snippet. When I execute this in a prefect flow I get the logging from main, but not from the thread. Any ideas? Thanks in advance 🙏
    Untitled
    k
    • 2
    • 7
  • k

    Krzysztof Nawara

    07/21/2021, 4:49 PM
    Is it possible to check (from inside the task) if the flow is being executed locally, without agent or it it's being executed in Prefect Server/Cloud by an agent? E.g. by looking up some property in the context?
    k
    • 2
    • 5
  • j

    Justin Liu

    07/21/2021, 6:39 PM
    Hi, can someone explain how exactly heartbeats work? I have a flow that reads a ton of data from snowflake and writes them to a series of tsv files, and when the run is about 20 minutes in it gets marked as failed by the Zombie killer, even though I know it’s still running. It may be helpful to mention that it’s running through a nested for loop. But digging a little deeper, I noticed that the code was printing logs about once every minute, and since the Zombie killer waits for no signal after 2 minutes I find it odd that it acts. Also, the code worked successfully when heartbeats were turned off, I’m just wondering if there is a better alternative in case this happens in the future.
    k
    m
    • 3
    • 9
  • s

    Sean Talia

    07/21/2021, 7:10 PM
    Is there some kind of standard configuration for a "Custom Role" that would basically be a "Read Only" user with the additional enhancement of being able to execute flows on an ad hoc basis? I created a custom role that mimicked the standard "Read Only" role, and gave it
    Create
    access on
    Flow > Run
    and
    Flow > Logs
    , but the person whom I gave this role was still unable to actually create a flow run from the prefect UI
    n
    • 2
    • 13
  • d

    Darshan

    07/21/2021, 7:26 PM
    Hello, is nested mapping possible with prefect ? I am trying to do something like this, 1 First task returns a list. 2. Second task maps over list of first task and returns a list again. 3. Third task should map over each element of the list returned by 2nd task. When I try this, 3rd task does not receive individual element from list of 2nd task but gets the list itself..
    k
    • 2
    • 1
  • f

    Frank Fineis

    07/21/2021, 7:38 PM
    Hey are there docs on Roles for Prefect Teams? Getting a 404 error when I go to https://docs.prefect.io/orchestration/concepts/roles.html
    k
    • 2
    • 2
  • k

    kiran

    07/21/2021, 9:23 PM
    Hi y’all. I’m getting started with Prefect this week and very excited! Question about script logic: does the
    with Flow(...):
    context manager basically replace using a
    main()
    function? i.e., instead of something like this:
    from prefect import task, Flow
    
    @task
    def say_hello():
        print("Hello, world!")
    
    @task
    def say_goodbye():
        print("Goodbye, world!")    
    
    
    with Flow("My First Flow") as flow:
        say_hello()
        say_goodbye()
    Would
    main()
    be used with the context manager (and is it weird/redundant to do so)? i.e., would you decorate each function with
    @task
    , order them in
    main()
    (but not decorate
    main()
    ) and then call
    main()
    from the context manager like so:
    from prefect import task, Flow
    
    
    @task
    def say_hello():
        print("Hello, world!")
    
    
    @task
    def say_goodbye():
        print("Goodbye, world!")
    
    
    def main():
        say_hello()
        say_goodbye()
    
    
    if __name__ == "__main__":
        with Flow("My First Flow") as flow:
            main()
    Thanks for any advice/insights on best practices around this!
    d
    e
    d
    • 4
    • 6
  • d

    DK

    07/21/2021, 9:58 PM
    I'm having some issues with the context variables. I seem to be getting incorrect results, or there's something obvious that I'm missing. For example,
    with Flow('Test 1') as flow:
    
        print(f"""
            flow name: {context.flow_name}
            flow id: {context.flow_id}
            flow run name: {context.flow_run_name}
            flow_run_id: {context.flow_run_id}
            """)
    results in :
    flow name: Test 1
        flow id: Test 1
        flow run name: bc5e286e-6268-43a8-aaaf-a5d03275917d
        flow_run_id: c867f4ea-1599-4201-a8e0-886cadbe0e1e
    It looks like the context variables are not entirely correct? Flow Name and Flow ID are both pulling the same field and I would expect Flow Run Name to pull something like 'violet-marmoset'. Is there an actual issue here or am I using these incorrectly?
    k
    • 2
    • 5
  • m

    Maikel Penz

    07/22/2021, 4:49 AM
    Hey when setting Docker as the storage for a flow you can specify dependencies through the
    python_dependencies
    property. However, I’m doing it dynamically and reading from a flow configured with Poetry. I noticed that the format in Poetry doesn’t match what is expected in the
    python_dependencies
    E.g: In Poetry:
    colorama = "^0.4.4"
    What I tried: `python_dependencies=["colorama^0.4.4"]`<< doesn’t work How it should be:
    python_dependencies=["colorama>=0.4.4"]
    Before I write something ugly to convert Poetry to Requirements.txt style, is there something available on Prefect to convert it or Poetry support ?
    k
    l
    • 3
    • 11
  • t

    Tom Blake

    07/22/2021, 6:38 AM
    Hi all! What kind of agent are people using in Azure? We're tossing up between the Docker agent on Azure App Service and the K8s agent on AKS We've managed to deploy both options but are interested if there are any compelling reasons out there to choose the K8s approach that warrant the extra infra configuration and maintenance that comes with it?
    k
    • 2
    • 1
  • r

    Rob Fowler

    07/22/2021, 7:20 AM
    I just had to use the flatten() annotation for the first time ever. I need one of those memes for "When I need to use flatten() I am really need to use it"
    😆 2
  • d

    dipsy wong

    07/22/2021, 8:46 AM
    Hi, I would like to ask about result validation as suggested in PIN16, it seems that we can add a validator to a task and throw ValidationError if that task returns a value that fails the validator. Is that still a WIP or we can use it already or some time sooner? And btw is it by now we can only add validators to a result object and call result.validate() and throw a validation error ourselves? Thanks https://docs.prefect.io/core/pins/pin-16-results-and-targets.html
    k
    d
    • 3
    • 13
  • m

    Mohammad GH

    07/22/2021, 9:08 AM
    Hi all! Using prefect 0.14.22 as docker image, I am getting the following error:
  • m

    Mohammad GH

    07/22/2021, 9:08 AM
    09:32:17 INFO CloudFlowRunner Flow run SUCCESS: all reference
    tasks succeeded 09:32:17 ERROR CloudFlowRunner Unexpected error:
    AttributeError("'Flow' object has no attribute
    'terminal_state_handler'") Traceback (most recent call last):   File
    "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line
    48, in inner     new_state = method(self, state, *args, **kwargs)   File
    "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py",
    line 669, in get_flow_run_state     state = self.determine_final_state(
    File
    "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py",
    line 720, in determine_final_state     if
    self.flow.terminal_state_handler: AttributeError: 'Flow' object has no
    attribute 'terminal_state_handler'
    z
    k
    • 3
    • 13
Powered by Linen
Title
m

Mohammad GH

07/22/2021, 9:08 AM
09:32:17 INFO CloudFlowRunner Flow run SUCCESS: all reference
tasks succeeded 09:32:17 ERROR CloudFlowRunner Unexpected error:
AttributeError("'Flow' object has no attribute
'terminal_state_handler'") Traceback (most recent call last):   File
"/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line
48, in inner     new_state = method(self, state, *args, **kwargs)   File
"/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py",
line 669, in get_flow_run_state     state = self.determine_final_state(
File
"/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py",
line 720, in determine_final_state     if
self.flow.terminal_state_handler: AttributeError: 'Flow' object has no
attribute 'terminal_state_handler'
z

Zach Angell

07/22/2021, 1:42 PM
Hi @Mohammad GH what version of Prefect is your agent running? And what version of Prefect Core did you use to register the flow?
m

Mohammad GH

07/22/2021, 1:49 PM
Hi @Zach Angell both 0.14.22
k

Kevin Kho

07/22/2021, 1:50 PM
Hey @Mohammad GH, what is the version of Prefect in the image you are using? This error is likely a version mismatch issue somewhere. Are you using Prefect Cloud or Server also?
m

Mohammad GH

07/22/2021, 1:52 PM
Hi @Kevin Kho I am using cloud version. The prefect version of image is also 0.14.22
I also saw some threads related to this mismatch but at least I have no mismatch in any component
k

Kevin Kho

07/22/2021, 2:05 PM
What RunConfig and Storage are you using?
m

Mohammad GH

07/22/2021, 2:23 PM
flow.storage = Docker(
base_image="prefecthq/prefect:0.14.22-python3.8",
registry_url=f"{registry_url},
image_name=f"{image_name}",
image_tag="latest",
python_dependencies=[f"{pip_packages}"],
)
flow.run_config = KubernetesRun(
image=f"{registry_url}/{image_name}",
labels=['k8s']
)
k

Kevin Kho

07/22/2021, 2:28 PM
That KubernetesRun has 0.14.22? Was it updated recently? With k8s, the default pull policy is
IfNotPresent
so there are cases where the image doesn’t update and you have to do`flow.run_config = KubernetesRun(image_pull_policy="Always")` . If you check the pod, do you see the 0.14.22?
m

Mohammad GH

07/22/2021, 2:29 PM
ok, let me run a job to check your suggestion
although the image is new in each run but i will check that
@Kevin Kho thanks
it solved my issue
k

Kevin Kho

07/22/2021, 3:37 PM
Nice! Glad you got it sorted
View count: 1