https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tomás Emilio Silva Ebensperger

    03/12/2021, 3:50 PM
    Hi! Pretty happy with how easy and straightforward the prefect api looks. I am a bit confused about the deployment in the cloud though. I was able to set up a simple example to execute a script every minute. i coudl run the agent on my virtual machine. My question is, how do i persist this execution when i exit my virtual machine. maybe a rookie question, thanks for the help.
    m
    n
    3 replies · 3 participants
  • c

    curtis smiley

    03/12/2021, 4:02 PM
    Hey Guys, Can someone help with a ValueError("Could not infer an active Flow context.") I have a few sql statements that need to be run in order. So im using for to loop through each sql statement. Here is a quick snippet of for loop. I can share more also!
    POINTER = None
    for statement in sql_list:
    
        task = BillingIcebergTask(statement=statement)
    
        # Pointer represents the previously set task
        if not POINTER:
            flow.add_task(task)
        else:
            flow.set_dependencies(task=task, upstream_tasks=[POINTER])
    
        # Set pointer
        POINTER = task
    n
    12 replies · 2 participants
  • c

    Charles Liu

    03/12/2021, 6:13 PM
    Hi! Do we have any prefect logging pros in here? I can see logger.info at the Prefect flow level and can see print commands at the module level, but no logger data from our own private module itself is making it to the Prefect logs. This is a legacy function I need to bubble up to continue implementing prefect. Any insight is helpful! Thanks!
    n
    8 replies · 2 participants
  • j

    John Urbanik

    03/12/2021, 7:15 PM
    Hey all, I noticed that the Prefect Cloud site mentions that Data Lineage is “coming soon.” Is there any more information about how/when this is planned to be implemented? We’re already using prefect core but are evaluating various MLOps solutions and it’d be nice not to have to introduce too many new tools.
    c
    3 replies · 2 participants
  • i

    itay livni

    03/12/2021, 8:02 PM
    Hi - I have a
    task
    that went "rogue" during a run, (prefect cloud). Is there a way to delete that specific run. It is contorting the Run History visualization
    n
    1 reply · 2 participants
  • b

    Brian Mesick

    03/12/2021, 8:41 PM
    Today’s weird question. Is there a way to confirm what image a k8s run used? I’ve uploaded a new image, with a new tag, and confirmed that my code changes are in it, but that’s definitely not the code that’s executing. I think there used to be a “versions” tab that had this info, but seems to have gone away?
    n
    m
    16 replies · 3 participants
  • a

    Alex Papanicolaou

    03/12/2021, 8:45 PM
    Hey all, has anyone looked into using a progress bar like
    tqdm
    inside a task? I know there would be issues around stdout and how the prefect logger works but maybe someone has thought about this.
    n
    b
    2 replies · 3 participants
  • c

    Charles Liu

    03/12/2021, 9:15 PM
    Does anyone know if CodeCommit Storage has to pass credentials like S3 Storage? My flow is having trouble seeing the file.
    n
    4 replies · 2 participants
  • r

    Rob Fowler

    03/12/2021, 10:52 PM
    any one already made a redis Result class? I am using docker storage and I think maybe LocalResults to a nfs/gfs mount or make a redis one?
    👀 1
    d
    c
    12 replies · 3 participants
  • j

    Jay Sundaram

    03/13/2021, 3:53 AM
    Hi all, What's the best way to troubleshoot the following? • my Docker image is in ECR • my ECSAgent is running on an AMI instance • Prefect Cloud Dashboard indicates my ECSAgent is online • labels match: 'prefect agent ecs start --label my-flows' and ECSRun(labels=['my-flows']) • I navigate to my project > Flow and select the latest version and click QUICK RUN and then nothing happens. The ECSAgent doesn't find the flow to submit for execution. Since the flow is not submitted, there are no logs to inspect. I've had success with a similar set up except where my Docker image was pulled from Docker Hub instead. Is there more logging somewhere else? Appreciate any tips!
    c
    3 replies · 2 participants
  • f

    Frederick Thomas

    03/13/2021, 4:42 PM
    Hello one and all, and of course I am new to Prefect so please forgive the following code snippet - I am attempting to connect to a webhook for notifications purposes in Teams, what I have is working, however, I also need the project name as well as the a link to a failed Flow's logs (e.g. http://172.18.1.5:8080/default/flow-run/618e28e6-e0ac-44aa-85e3-ccf4f865c833?logs=). This is what I have so far:
    import requests
    import os
    def vent(flow:str,state:str):
            
    msg = "Flow {0} finished in fail state '{1}'".format(flow, state)
            
    g = os.environ.get("TEAMS_NOTIFICATION")# webhook link to Outlook for Teams
            
    <http://requests.post|requests.post>(g, json={"text": msg})
    check = lambda flow,state:vent(flow.name,state.message)
    .
    .
    .
    with Flow(flow_name, schedule=None,on_failure=check) as flow:
    Which is probably not optimal but I just found the API and I don't have the time to tuck in at the moment. Any help is greatly appreciated. Thanks.
    d
    14 replies · 2 participants
  • c

    CA Lee

    03/14/2021, 4:42 AM
    Hi all, using S3 for storage, ECS for execution, ECR for image repo. Prefect version:
    0.14.12
    , Python version:
    3.8.8
    , Docker base image:
    prefecthq/prefect:0.14.12-python3.8
    My flows are registered using CI (Github Actions) with the following shell command in a loop over my flows, to avoid re-registration of flows on each push (adapted from this guide):
    prefect register flow --skip-if-flow-metadata-unchanged
    I develop my tasks separately from my flows, as a Python package, which then gets built as a Docker container and pushed to ECR. _Expected behavior_: --------------------- Flow pulls the latest flow code from S3 (from the most recent push), which then runs the latest task code from ECR _Observed behavior_: ---------------------- On each flow registration, a new file is uploaded to S3. However, on inspecting flow logs, the flow does not pull the latest version of the flow code stored in S3. This was verified by looking at the
    slugified-current-timestamp
    of the S3 flow - an earlier version of the flow is being pulled instead. I have also pulled
    image:latest
    from ECR on my machine, dropped into a shell using
    local run
    and confirmed that my task code in the Docker container was updated, but the flow code stored on S3 is not pulling the updated task code from ECR. Storage:
    STORAGE = S3(bucket="aws-ecs-flows")
    Run config:
    RUN_CONFIG = ECSRun(
        image="repository-url/image:latest",
        labels=['ecs']
    )
    Re-registering the flows without specifying
    --skip-if-flow-metadata-unchanged
    results in the expected behavior, but also leads to unwanted flow version bumps in the Prefect Cloud UI. The strange thing is that even though the tasks were developed (as a Docker container) separately from the flows, the flows doesn’t pull the latest task code from the Docker image, even when the image tag
    latest
    is specified (the task code seems to stop at the version which identified that the flow metadata was unchanged i.e.
    idempotency_key = flow.serialized_hash()
    , even if the task code was modified). Appreciate if anyone has any insight on how to make this work for ECS! Kubernetes has a flag called
    imagePullPolicy: always
    which may solve this issue, but I’m not aware if there is something similar for ECS.
    m
    m
    +1
    15 replies · 4 participants
  • w

    Warsame Bashir

    03/14/2021, 4:20 PM
    Hi. how does one deploy to prefect cloud a flow. My agent is running and I've tried
    flow.run_config = LocalRun()
    and
    with Flow("ETL", run_config=LocalRun(labels=["dev", "ml"])) as flow:
    a
    4 replies · 2 participants
  • v

    Varun Joshi

    03/15/2021, 4:53 AM
    Hey Prefecters, Is there a way to delete flows in mass at once rather than clicking on each flow and deleting them individually? 🙂
    n
    2 replies · 2 participants
  • t

    Thanneermalai Lakshmanan

    03/15/2021, 8:28 AM
    Hi , Can we subscribe to a graphql subscription end point to listen to prefect task / flow state.
    c
    2 replies · 2 participants
  • m

    Milly gupta

    03/15/2021, 10:46 AM
    HI, I am not able to run any flow atm though Prefect dashboard shows I am using 0 slot
    g
    8 replies · 2 participants
  • v

    Vipul

    03/15/2021, 11:38 AM
    Hey All, just trying to workout what would be the best approach to solve the issue of task waiting for external system file to be available. To give a bit of context, I have lot of task that could wait for certain files to be available from external system If all the tasks are waiting for the files to be available than it might soon use up all the worker pool Was wondering if there is a way to retry those blocking task after 10 seconds so it does not hold the worker if it is not doing anything with Flow("Get Files") as file: get_file_from_system_a() get_file_from_system_b() @task def get_file_from_system_a(): while file_not_available: # Instead of holding one worker to wait for this file and end up worker starvation, # I was wondering if there is a way to retry this task after 10 seconds sleep(10) process_task()
    d
    k
    24 replies · 3 participants
  • w

    Will Milner

    03/15/2021, 2:26 PM
    Hello, I'm currently using the ShellTask with a piped command and it seems like prefect is only checking the exit status of the last command, i.e. if I run
    foo | bar
    it will only check the exit status of
    bar
    any ideas on how to make prefect raise an error if either tasks fails?
    k
    3 replies · 2 participants
  • r

    Ross

    03/15/2021, 2:39 PM
    Hi, am wondering if there is any way to control the priority of scheduled/queued flows? We have some fairly bursty workloads where we are compute limited (planning to limit number of flows per agent) and so want the option to occasionally submit new flow runs with a higher priority. The only thing I can think of is maybe using agent labels which doesn't feel like the right way!
    k
    2 replies · 2 participants
  • t

    Tomás Emilio Silva Ebensperger

    03/15/2021, 3:01 PM
    Hello, any recommendations for tutorials about prefect besides the youtube channel? thanks
    k
    4 replies · 2 participants
  • i

    itay livni

    03/15/2021, 3:11 PM
    Hi - It looks like the prefect api docs aren't reflecting the latest version of prefect.
    k
    4 replies · 2 participants
  • m

    Matthew Blau

    03/15/2021, 3:47 PM
    Hello all, I had a question regarding logging: I have a flow that largely looks like the one outlined here: https://docs.prefect.io/core/examples/functional_docker.html that we run through the Server UI. I have noticed that the logs from the container only show up if there is a problem with the code and I would like to know if there is a way in the GetContainerLogs call that we can collect the logs from the python standard logging library and have them printed in the UI for ease of use. Currently info, debug, and exceptions get logged to disk and I would like to have both; logged to disk and shown in the UI
    d
    11 replies · 2 participants
  • s

    Samuel Hinton

    03/15/2021, 4:18 PM
    Hi all! Is there a setting I can pass to the agents to say “Poll every 2 seconds” or similar instead of every 10 seconds. 10 seconds is pretty good but we have a lot of quick tasks flows happening and picking them up faster would be great!
    👀 1
    d
    13 replies · 2 participants
  • a

    Andrew Hannigan

    03/15/2021, 5:51 PM
    Any plans for hosted flows / flow runs in the future? For clients that don’t care as much about code being 100% private? Or to make it easier to just try out Prefect? I know that in some ways that feels like the antithesis of Prefect, as it’s meant to be deployed on the clients cloud. But remember that
    git
    was designed as a fully distributed version control system, and doesn’t stop the majority of users from using it in a centralized workflow. For
    git
    , solving the harder version control problem turned out to yield a solution that also worked better for the easier version control problem. Maybe that would also be the case for prefect and the workflow orchestration problem too.
    k
    2 replies · 2 participants
  • r

    Robert Bastian

    03/15/2021, 5:52 PM
    The upgrade from 0.14.6 to 0.14.12 broke my ECS/Fargate implementation. From what I can tell the requiredCompatibilities isn’t getting set correctly in the task definition that the Agent is registering with ECS via boto3. My flows worked on 0.14.6 but with the revamp of ECS on 0.14.12 they all get this error:
    [2021-03-15 17:48:45,887] ERROR - rai-fargate | Error while deploying flow
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/agent/agent.py", line 414, in deploy_and_update_flow_run
        deployment_info = self.deploy_flow(flow_run)
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/agent/ecs/agent.py", line 322, in deploy_flow
        resp = self.ecs_client.run_task(taskDefinition=taskdef_arn, **kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/botocore/client.py", line 357, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/botocore/client.py", line 676, in _make_api_call
        raise error_class(parsed_response, operation_name)
    botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
    I checked the registered task definition and I can see:
    "compatibilities": [
        "EC2"
      ],
    When I do the same with 0.14.6 I see:
    "compatibilities": [
        "EC2",
        "FARGATE"
      ],
    Thx!
    m
    j
    59 replies · 3 participants
  • m

    Marwan Sarieddine

    03/15/2021, 6:10 PM
    Hi folks, I am trying to figure out if there is a simpler way than what I have in mind to ensure that only one flow run for a particular flow can run at a specific point in time
    k
    a
    8 replies · 3 participants
  • c

    Charles Liu

    03/15/2021, 6:38 PM
    Just seeking some clarification: I was digging through old questions and came across this solution: [link: https://prefect-community.slack.com/archives/CL09KU1K7/p1615218708404000?thread_ts=1615022331.342300&amp;cid=CL09KU1K7 ], and it says:
    # You'd set a secret in Prefect cloud containing your credentials as a JSON dictionary of the following form:
    {"ACCESS_KEY": "your aws_access_key_id here",
     "SECRET_ACCESS_KEY": "your aws_secret_access_key here"}
    
    # For now the secret name is hardcoded, so you'd need to name the secret "AWS_CREDENTIALS"
    
    # you'd then attach the secret to your `S3` storage as follows:
    flow.storage = S3(..., secrets=["AWS_CREDENTIALS"])
    Assuming AWS_CREDENTIALS has been declared on the Prefect Cloud side, is this how we pass functions into both storage=storage_type(secrets=['']) and KubernetesRun(image_pull_secrets=['']) for now?
    k
    4 replies · 2 participants
  • p

    Pedro Machado

    03/15/2021, 7:08 PM
    Hi there. One of my clients is considering Prefect and someone in their company brought up (in the context of using open source tools) that, at some point, Oracle was planning to charge some licensing fees for pieces of java that were previously open source. They asked me whether Prefect uses components that may rely on java and could be negativelly impacted if there was a change in java licensing. My understand is that the code is all written in Python, but I thought I'd ask anyway. Thanks
    d
    2 replies · 2 participants
  • j

    Justin Chavez

    03/15/2021, 7:50 PM
    Hi all, when using
    StartFlowRun
    is it possible to see the logs for each called flow? I am locally running a parent flow that calls multiple child flows using
    StartFlowRun
    , but I can only see the success or failed status, not any of stdouts that some of the child flow tasks prints.
    k
    2 replies · 2 participants
  • j

    Julio Venegas

    03/15/2021, 8:00 PM
    Hi everyone! I'm having issues getting logs displayed in UI (prefect server, local agent) from a local package that has it's own logging setup i.e. I'm not setting up logs at task level, I want to use the module's own defined logs. I had a look at the different threads after searching for "extra loggers", I'm following the steps already mentioned in several threads and in the docs. I'm attaching the flow and the package code for reference. The package is
    adls_utils
    , I'm using a
    logging.conf
    file that gets read in the main module
    dataproject.py
    . The flow is
    adls_create_day_dirs.py
    . I start the local agent with the
    local_agent_start.txt
    to set environment variables, i.e. that's where I set
    PREFECT__LOGGING__EXTRA_LOGGER
    . @Jim Crist-Harif saw you've helped other people with this topic before, hence the mention but feel free to leave it for the community to answer :)
    adls_utils.zipadls_create_day_dirs.pylocal_agent_start.txt
    k
    6 replies · 2 participants
Powered by Linen
Title
j

Julio Venegas

03/15/2021, 8:00 PM
Hi everyone! I'm having issues getting logs displayed in UI (prefect server, local agent) from a local package that has it's own logging setup i.e. I'm not setting up logs at task level, I want to use the module's own defined logs. I had a look at the different threads after searching for "extra loggers", I'm following the steps already mentioned in several threads and in the docs. I'm attaching the flow and the package code for reference. The package is
adls_utils
, I'm using a
logging.conf
file that gets read in the main module
dataproject.py
. The flow is
adls_create_day_dirs.py
. I start the local agent with the
local_agent_start.txt
to set environment variables, i.e. that's where I set
PREFECT__LOGGING__EXTRA_LOGGER
. @Jim Crist-Harif saw you've helped other people with this topic before, hence the mention but feel free to leave it for the community to answer :)
adls_utils.zipadls_create_day_dirs.pylocal_agent_start.txt
k

Kyle Moon-Wright

03/15/2021, 8:26 PM
Hey @Julio Venegas, I don’t think we’ll be able to give you a step-by-step walkthrough to get this logger in place with your Server deployment - there is likely a lot of customization needed, are you running into a specific issue?
j

Julio Venegas

03/15/2021, 8:35 PM
There's no error to report, is more a configuration issue.
dataproject.py
has a class that will be used beyond Prefect flows, so it has it's own logging in place using the
logging
module from the standard library. The flow
adls_create_day_dirs.py
imports from
dataproject
but the UI won't show any of the info level logs defined in
dataproject
😞 I'd like to avoid code duplication if possible, otherwise I guess I can set task level logs.
👀 1
k

Kyle Moon-Wright

03/15/2021, 9:21 PM
Hmm, since a Resource Manager is an object consisting of Tasks - I think you can
log_stdout=True
 for each task (in your case init, setup, cleanup) to propagate those logs to the UI. I’ll need to get more information on this and get back to you!
Apologies Julio, looks like this ^ isn’t a great solution for this use case. After reviewing this thread on configuring a custom logger with this supplemental repo, everything in your configurations is looking good. One difference I could discern includes the usage of a resource manager, so it may be worth trying a single function with your
logging.conf
settings to narrow the issue and rule out nested logs. If things still aren’t functioning, let us know and we’ll get you sorted.
j

Julio Venegas

03/16/2021, 10:49 AM
@Kyle Moon-Wright I HAD A FREAKING TYPO
PREFECT__LOGGING__EXTRA_LOGGER="['adls_utils']"
EXTRA_LOGGER instead of EXTRA_LOGGER*S*. But logs are working now 🙂 Still kinda of afraid that I have no proper understanding of the logging functionality for Prefect but I'm still quite new to the project. Thanks a lot for the help and being so responsive!
k

Kyle Moon-Wright

03/16/2021, 3:31 PM
Oh brother I should’ve caught that, haha. Nice catch! 🚀
View count: 1