https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tim Galvin

    11/07/2022, 9:12 AM
    What is the correct way of specifying a 'dummy' storage block for a deployment? My code that I have wrapped in prefect tasks correctly deals with absolute paths. Additionally, the data I am working with are on the order of 2TB, and are not really appropriately set up for the default local file system logic used when
    --storage-block
    is unset to copy. I thought I could set up a 'dummy' local file system block in the Orion UI (in my own managed server, not prefect cloud), however the
    prefect deployment build
    command says
    'github', 's3',  'gcs', 'azure', 'smb'
    are supported types. TL;DR - I need to set a
    --storage-block
    in my deployment, and I am reasonably certain in my situation I do not want to be copying anything to / from different file systems and blocks. I have a common underlying filesystem at the HPC center, and my data are pretty large -- large enough where I can not reasonably expect copying to and from the disk to be feasible
    k
    a
    • 3
    • 6
  • s

    Sunjay

    11/07/2022, 10:19 AM
    Hi , can someone tell me how to pass Service Account Info in the prefect_gcp block. I wanted to pass the values from the secrets block but prefect 2 doesn't seem to support json in secret block. What is the best way to get GCP BigQuery client from a dictionary. i.e a way to pass a dictionary from any of the blocks so that I can pass the Service Account Info as a parameter to the to obtain the big query client.
    k
    • 2
    • 3
  • r

    Rabea Yousof

    11/07/2022, 11:20 AM
    Hi everyone, i'm trying to use postgres database as prefect backend, I added the connection string and i stared the orion, all tables created at my db but missing fields, and got this error. can anyone help me?
    t
    • 2
    • 7
  • t

    Thom

    11/07/2022, 11:34 AM
    Hi! When using the Github storage block to connect to a private repo (using Prefect version 2.6.6), I get a password related error (details in the first reply) when running a flow, what am I doing wrong?
    ✅ 1
    • 1
    • 3
  • s

    Stéphan Taljaard

    11/07/2022, 1:12 PM
    Hi. I'm not missing anything specific from this, but wanted to make sure: Prefect 2 does not have an alternative for Prefect 1's
    Flow(...).validate()
    , right?
    ✅ 1
    k
    m
    • 3
    • 4
  • v

    vholmer

    11/07/2022, 1:18 PM
    Hi, when running prefect deployment build with an infrastructure block I want to override the values of an array in the yaml, does anyone here know how I can do that? 🤔
    ✅ 1
    k
    • 2
    • 10
  • m

    Mark Li

    11/07/2022, 1:58 PM
    Hi All, I’m working on moving away from using the Bitnami/Postgres subchart to an Azure DB for Postgres instance. After fixing the dialect name, I’m currently running into this error (will post in thread).
    ✅ 1
    r
    • 2
    • 18
  • b

    Blake Stefansen

    11/07/2022, 2:06 PM
    Hi Everyone, My team is getting k8 deployment jobs where the job log states the flow run status cannot transition to
    RUNNING
    Engine execution of flow run '148d81ea-3cfe-4db1-a0d4-3f3f17748fb0' aborted by orchestrator: This run cannot transition to the RUNNING state from the RUNNING state.
    Will add more details to thread
    m
    b
    +2
    • 5
    • 8
  • m

    Mohamed Alaa

    11/07/2022, 2:22 PM
    Hello guys, ever since i updated prefect to 2.5+ i can not access the default blocks from Prefect UI. I dont know if that only occurs for me, but when i downgrade prefect to 2.3 the blocks seems to show up again. Any solutions for that?
    👀 1
    k
    l
    • 3
    • 5
  • j

    JV

    11/07/2022, 3:15 PM
    Hi Everyone, I am trying to trigger prefect flow deployed on cloud whenever file is landed to S3 bucket using AWS Lambda function by referring this documentation from Chris. Lambda function is failing with
    "errorMessage": "HTTP Error 404: Not Found"
    I am passing API URL
    <https://api.prefect.io>
    . I understand that this documentation is not latest and I also tried URL
    <https://api.prefect.cloud/>
    and getting same error. Request your inputs regarding this error in Prefect version 2.0
    ✅ 1
    r
    n
    • 3
    • 12
  • b

    Bradley Hurley

    11/07/2022, 4:23 PM
    Hi Prefect Folks - Is it possible to adjust the log level of a running flow? 🧵
    k
    • 2
    • 8
  • f

    Fancy Arora

    11/07/2022, 5:42 PM
    Hi Everyone, is it possible to use the below thing for prefect 2.0? https://docs-v1.prefect.io/orchestration/recipes/configuring_storage.html#providing-the-dockerfile
    m
    • 2
    • 1
  • j

    Jeff Hale

    11/07/2022, 5:54 PM
    https://prefect-community.slack.com/archives/C036FRC4KMW/p1667831912413949
    🎉 2
    🙌 2
    :marvin: 2
  • l

    Lukasz Mentel

    11/07/2022, 6:44 PM
    Hi All, Would anyone have experience passing
    io.StringIO
    objects between tasks in prefect 2.0? Here's a minimal example I'm trying to get to work but getting weird results, appreciate any help on this.
    import io
    from prefect import task, flow
    
    @task
    def string_to_io():
        return io.StringIO("hello prefect")
    
    @task
    def consume(string_io):
        print(type(string_io))
        return string_io.read()
    
    @flow
    def test():
        # s = string()
        str_io = string_to_io()
        r = consume(str_io)
        print(r)
    
    test()
    when I run it I get
    9:38:59.511 | INFO    | prefect.engine - Created flow run 'congenial-longhorn' for flow 'test'
    19:38:59.708 | INFO    | Flow run 'congenial-longhorn' - Created task run 'string_to_io-78a25967-0' for task 'string_to_io'
    19:38:59.709 | INFO    | Flow run 'congenial-longhorn' - Executing 'string_to_io-78a25967-0' immediately...
    19:38:59.768 | INFO    | Task run 'string_to_io-78a25967-0' - Finished in state Completed()
    19:38:59.789 | INFO    | Flow run 'congenial-longhorn' - Created task run 'consume-884672dc-0' for task 'consume'
    19:38:59.790 | INFO    | Flow run 'congenial-longhorn' - Executing 'consume-884672dc-0' immediately...
    <class 'list'>
    19:38:59.828 | ERROR   | Task run 'consume-884672dc-0' - Encountered exception during execution:
    Traceback (most recent call last):
      File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1222, in orchestrate_task_run
        result = await run_sync(task.fn, *args, **kwargs)
      File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
        return await anyio.to_thread.run_sync(call, cancellable=True)
      File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
        return await get_asynclib().run_sync_in_worker_thread(
      File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/home/lukasz/projects/zlotyryjek/crap.py", line 18, in consume
        return string_io.read()
    AttributeError: 'list' object has no attribute 'read'
    So for some reason
    stringIO
    is converted to an empty list 🤔
    j
    m
    • 3
    • 11
  • k

    Kalise Richmond

    11/07/2022, 7:07 PM
    https://prefect-community.slack.com/archives/C036FRC4KMW/p1667848041248719
    🙌 4
    🚀 4
    :blob-attention-gif: 5
    :party-parrot: 4
  • n

    Nick Coy

    11/07/2022, 9:12 PM
    I have noticed that flow runs seem to get stuck in a running state. This seems to be happening more and more frequently. We are using Prefect 2.4.0 and using K8s and GCS for infrastructure. I have found that for these flow runs I am seeing logs on the agent like this
    prefect.agent - An error occured while monitoring flow run <flow_run_id> The flow run will not be marked as failed, but an issue may have occurred.
    m
    k
    • 3
    • 18
  • p

    Philip MacMenamin

    11/07/2022, 9:28 PM
    previously I had something along the lines of
    with Flow(
        "...",
        state_handlers=[utils.notify_completion, utils.notify_running],...
    with
    def notify_running(flow: Flow, old_state, new_state) -> State:
    And I could ask what the new state is doing. eg if
    if new_state.is_running()
    I'm reading https://docs.prefect.io/concepts/states/to try to work out how to capture these flow level state changes and am struggling. Any pointers?
    ✅ 1
    m
    • 2
    • 26
  • t

    Tim Ricablanca

    11/07/2022, 10:09 PM
    hi folks! long time listener, first time caller 😄 — i have a question about upgrading and downgrading the database (we’re using a postgres container) along side prefect versions. i ran into an issue when trying to rollback from 2.6.6 to 2.6.4 and got this error on orion startup:
    alembic.util.exc.CommandError: Can't locate revision identified by '41e5ed9e1034'
    i’ll post some details in a thread about what i tried before nuking the database and starting over, but 1) is there guidance on how i should be using
    prefect orion database stamp
    or
    … revision
    before upgrading orion from 2.6.4 -> 2.6.6 (or any other minor version) ? and 2) should i be running
    … downgrade
    from the 2.6.6 package or 2.6.4 package if i wanted to go from 2.6.6->2.6.4?
    ✅ 1
    m
    m
    • 3
    • 15
  • z

    Zac Hooper

    11/07/2022, 11:53 PM
    Does the Orion Server produce any more logs beyond the ones that appear in the agents logs? I can't seem to work out what is causing my flow to crash. I have a flow that runs every minute but at least once a day it will fail due to:
    Crash detected! Execution was interrupted by an unexpected exception.
    Looking over the logs produced by the agent I can't see the Exception thrown. The flow looks like below. I've changed the variables used but the premise is that two lambdas are invoked but the 2nd needs to wait until the first is completed.
    import os
    from prefect import flow, get_run_logger, task
    from prefect.deployments import Deployment
    from prefect.filesystems import S3
    from prefect.orion.schemas.schedules import CronSchedule
    from utils.aws import invoke_lambda
    
    # --- Flow Details --- #
    client = "example_client"
    project = "example_project" 
    flow_name = "Example Name"
    description = """
        Example Description
    """
    tags = ["example"]  # Optional list of string tags for the flow. Helps to filter in UI
    schedule = CronSchedule(cron="* 7-19 * * MON-FRI", timezone="Australia/Sydney")
    
    # --- Other Flow Details (Leave as is) --- #
    work_queue = "example_workqueue"
    storage = S3.load("s3-storage")
    output_file_name = f"{os.path.splitext(os.path.basename(__file__))[0]}.yaml"
    
    def invoke_lambda(function_full_name, payload, blocking=False, **kwargs):
        """
        Invokes the given lambda function in AWS.
    
        Args:
            function_full_name (str): Exact name of the lambda function
            payload (dict): Dictionary of data you may want to pass to lambda
            blocking (bool, optional): If you want to wait for a response from the lambda. Defaults to False.
        """
        config = Config(max_pool_connections=250, read_timeout=900, connect_timeout=900)
        lambda_client = boto3.client("lambda", config=config)
        # Change client's region
        if kwargs.get("region"):
            lambda_client = boto3.client(
                "lambda", config=config, region_name=kwargs.get("region")
            )
    
        # DO NOT BLOCK IF WILL RUN OVER 30 SECONDS
        payload = bytes(json.dumps(payload), encoding="utf8")
    
        if blocking:
            response = lambda_client.invoke(
                FunctionName=function_full_name,
                InvocationType="RequestResponse",
                LogType="Tail",
                Payload=payload,
            )
            res_payload = response.get("Payload").read()
            return json.loads(res_payload)
        else:
            response = lambda_client.invoke(
                FunctionName=function_full_name,
                InvocationType="Event",
                LogType="None",
                Payload=payload,
            )
            return response
    
    @task
    def first_lambda_to_invoke():
        logger = get_run_logger()
        res = invoke_lambda("first_lambda_to_invoke", {}, True)
        <http://logger.info|logger.info>(res)
        return ""
    
    
    @task
    def second_lambda_to_invoke():
        logger = get_run_logger()
        res = invoke_lambda("second_lambda_to_invoke", {}, True)
        <http://logger.info|logger.info>(res)
        return ""
    
    
    @flow
    def example_flow():
        x = first_lambda_to_invoke.submit()
        y = second_lambda_to_invoke.submit(wait_for=[x])
    
    
    if __name__ == "__main__":
        Deployment.build_from_flow(
            flow=example_flow,
            name=flow_name,
            work_queue_name=work_queue,
            tags=[client, project] + tags,
            schedule=schedule,
            description=description,
            output=output_file_name,
            storage=storage,
            apply=True,
            path="/",
        )
    Here is an example of the log output from a failed run
    Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
    09:32:10 AM
    Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
    09:32:10 AM
    Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
    09:32:10 AM
    Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
    09:32:10 AM
    Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
    09:32:10 AM
    Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
    09:32:10 AM
    Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
    09:32:10 AM
    Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
    09:32:10 AM
    Crash detected! Execution was interrupted by an unexpected exception.
    09:32:10 AM
    first_lambda_to_invoke-ec90046a-0
    {'statusCode': 200}
    09:32:21 AM
    first_lambda_to_invoke-ec90046a-0
    Crash detected! Execution was interrupted by an unexpected exception.
    Note the
    {'statusCode': 200}
    response from the Lambda showing that it ran successfully.
    ✅ 1
    m
    • 2
    • 13
  • b

    Ben Muller

    11/08/2022, 1:57 AM
    Hey - what is the prefect 2.0 equivalent of how I would do `
    .set_dependencies(
                upstream_tasks=[my_upstream_task]
            )
    Oh - I can see in the message above there is a
    wait_for
    kwarg in the submit handler... Ignore me 👀
    ✅ 1
    z
    • 2
    • 1
  • f

    Faheem Khan

    11/08/2022, 4:09 AM
    Prefect 2.6.6, local server, docker containers. Hi all, I am getting two errors consistently after some time in agent logs, not running any flows. Theses errors later on fial my flows sometime.
    BrokenPipeError: [Errno 32] Broken pipe
    ConnectionResetError: [Errno 104] Connection reset by peer
    . Any leads would be much appreciated, cheers
    m
    • 2
    • 2
  • e

    Evgeny Ivanov

    11/08/2022, 5:46 AM
    I wonder if there is a recommended way to clear task input cache? Details below. Suppose I've set up and ran a task with
    cache_key_fn=task_input_hash
    and
    cache_expiration=timedelta(days=1)
    . Now for some reason I want to clear cache and run the task again (suppose I've changed the logic in a module I'm using from the task). My options (including non-working): 1. Deleting cache files doesn't work. If I run a task after deleting files I get an error. Prefect doesn't check if the file exists before deciding on using cache. 2. Deleting flow runs or task runs does work. But it's not convenient for two reasons: a. I should delete history of runs. b. I have to remember which flow/task run to delete or delete all of them. 3. Adding extra parameter
    cache_num
    to task should work. I can just change it's value every time I want to avoid using cache. But it generates extra boilerplate code and I have to change flow/task code to change parameter value. It looks like a dirty duct tape to me) 4. Changing data in OrionDB maybe possible, but I'm sure that it's strongly not recommended. And I'd like to avoid it. Maybe there is a better option I don't know about? A perfect solution would be CLI or API with flow/task name as an input parameter.
    ✅ 1
    ➕ 1
    t
    j
    • 3
    • 4
  • m

    Michiel Verburg

    11/08/2022, 8:10 AM
    I wonder if anyone has a similar use-case to ours, where we want to let an application (so end-users indirectly) trigger tasks in our Prefect installation. So basically event-driven. I know it’s possible, but I wonder what the best practice is? Currently we are thinking we should add an abstraction layer, especially so that we have a consistent API since the workings of it might change (the way we query a flow/deployment I believe changes when we switch to using Kubernetes for example, and we want our application back-end to be agnostic of that, especially because different people are responsible for that). But is it silly to put a FastAPI in front of Prefect, which under water also uses FastAPI? Or does it make perfect sense from the perspective of abstraction?
    k
    • 2
    • 1
  • i

    Ikkyu Choi

    11/08/2022, 8:13 AM
    Hi all, I wonder the
    is_schedule_active
    means status of flow (turn on or turn off) ? I got a some flow’s`is_schedule_active` bool value as true by using graphql, but on my prefect UI this flow is turned off state. Am i think wrong?
    1️⃣ 1
    m
    • 2
    • 1
  • r

    Rohith

    11/08/2022, 11:05 AM
    Hi All,
    👋 1
  • r

    Rohith

    11/08/2022, 11:05 AM
    I have a flow that executes sql query on redshift using prefect-sqlalchemy and my queries are always aborted
    ✅ 1
    j
    k
    • 3
    • 11
  • p

    Piotr Bródka

    11/08/2022, 11:17 AM
    Hi, I have a question about Prefect working with Ray https://prefecthq.github.io/prefect-ray/ It is possible to run task on many machines using this setup? Is it creating a Ray Cluster then?
    ✅ 1
    o
    r
    • 3
    • 2
  • e

    Evgeny Ivanov

    11/08/2022, 11:36 AM
    Hi again! I'm trying to set up SQLAlchemy Database Credentials through Prefect UI. What is the format for the Connect Args field? Is it
    param=value
    ,
    'param': 'value'
    or something else? Anyway after editing this field I cannot create the Block. Chome gives me the following error. UPD. I've found out that I have to fill Query and Connect Args either with some values or with
    null
    . And the right format for Connect Args is JSON:
    {
      "param": "value"
    }
    P.S. Btw, I cannot create Block in Firefox. Is this browser not supported?
    j
    • 2
    • 2
  • d

    Denis

    11/08/2022, 1:08 PM
    Hello, what is the proper way of triggering the same flow with different parameters in prefect 2, and then wait for all of them to finish. Example of what I want to achieve:
    flow_run_ids=[]
    for param in params:
        flow_run_id = create_flow_run(
                flow=flow_model,
                parameters=run_params,
                name=run_name,
                state=state
            )
        flow_run_ids.append(flow_run_id)
    
    wait_for_flows(flow_run_ids)
    Thanks for any help
    m
    • 2
    • 2
  • s

    Sunjay

    11/08/2022, 2:14 PM
    Hi guys, can someone tell me how to run each task of the that is being called using .map() function to run sequentially. Right now when I trigger, it creates a parallel exeution of all the individual tasks. i want to run them one after the other. Can someone please guide me with this ?
    ✅ 1
    p
    j
    • 3
    • 6
Powered by Linen
Title
s

Sunjay

11/08/2022, 2:14 PM
Hi guys, can someone tell me how to run each task of the that is being called using .map() function to run sequentially. Right now when I trigger, it creates a parallel exeution of all the individual tasks. i want to run them one after the other. Can someone please guide me with this ?
✅ 1
p

Peyton Runyan

11/08/2022, 2:50 PM
Hey there! You can just run them in a for-loop for sequential execution.
🙌 1
s

Sunjay

11/08/2022, 4:07 PM
Got it , Thanks@Peyton Runyan ! that worked. I was just trying the lazy apporach and re-use most of the my existing code in prefect 1 and migrate it to prefect 2 . @Peyton Runyan One last question. Can we set a task to success state, ie. based on a condition execute a task else set it to success and trigger the downstream.
j

Jeff Hale

11/08/2022, 6:40 PM
For your second question, Prefect 2 has a Completed run state and other states, but there is not a Success run state. In Prefect 2, you can just bring your Python code and raise and error if something fails - otherwise the next task in the flow will execute and you’ll be on your way.
🙌 1
s

Sunjay

11/15/2022, 12:08 PM
@Jeff Hale Can you please help me with this, How to explicitly set the state of a task to completed?
if conditions == 'Foo":
a = task1()
else:
"set task1() state to success" #i.e skip execution of task1()
b= task2() # task 2 wait_for the completed state of task1
So this is what I want to achieve in essence. I just wanted to know how to set the task1 state to completed as it can be skipped in case it doesn't meet a certain condition but the downstream job depends on the success state of the task1 using wait_for parameter.
j

Jeff Hale

11/15/2022, 1:43 PM
I don’t believe you can directly set the state. However, would something like the following do what you want?
def flow1(x):
    if x == "Foo":
        a = task1()
    else:
        a = "didn't run task1"
        b = task2()

    if a != "didn't run task1" and b:
       # do something
🙌 1
s

Sunjay

11/15/2022, 2:53 PM
@Jeff Hale Thank you so much
View count: 4