https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • l

    Louis Amon

    08/29/2022, 1:13 PM
    Hello, How would one go about implementing API rate limits with Prefect ? I saw in the docs that you can define per-tag concurrency limits, but what about rate limiting (e.g. I can run only 40 API calls per seconds for API #1, 100 calls/s for API #2, etc.) ?
    j
    • 2
    • 6
  • j

    Johnathan Nguyen

    08/29/2022, 1:33 PM
    Hi Community, After starting a dask cluster, I have registered my flow by executing : 'python my_flow.py'. I got scheduler_address = gateway://dask-gateway-server:8000/74c3991658104aa68665c11ac52e767a After that I can execute the flow without problem. If I recreate a new dask cluster, I got a new scheduler_address gateway://dask-gateway-server:8000/fb67ee2be228422eafb66d2e73344e29 But when I execute the flow, it does not recreate a DaskExecutor with the new scheduler_address. Is there any way to refresh the daskExecutor on running flow ? In my project, I don't want to re-register everytime I start a new dask cluster. Thank you for your help. the script my_flow.py in thread :
    1️⃣ 1
    ✅ 1
    j
    a
    • 3
    • 8
  • s

    Scott White

    08/29/2022, 1:34 PM
    Have a problem triggering flow-runs from completed flows with variable parameters passed. I can trigger the flows just fine, but the parameters are not getting passed correctly. Can you share the code syntax to trigger a new flow with parameters using
    prefect<2.0
    and the functional API? I will share the pseudo-code and syntax in a reply.
    ✅ 1
    j
    • 2
    • 4
  • j

    José Duarte

    08/29/2022, 1:47 PM
    Prefect seems to be “eating” my logs, as in, any logs I issue don’t appear when running flows
    import prefect
    import logging
    logger = logging.getLogger(__name__)
    
    @prefect.flow
    def f():
        <http://logger.info|logger.info>("test") # doesn't appear
    What am I doing wrong?
    ✅ 1
    s
    j
    • 3
    • 3
  • j

    José Duarte

    08/29/2022, 2:23 PM
    Another question: is it possible to launch a deployment and register it as a subflow? I am using an adapted version of this code: https://prefect-community.slack.com/archives/CL09KU1K7/p1660916071451349?thread_ts=1660903250.753849&amp;cid=CL09KU1K7 And being able to register the deployment launch as a subflow, would be very useful. Similar solutions are appreciated
    ✅ 1
    a
    • 2
    • 2
  • m

    Michael Law

    08/29/2022, 3:54 PM
    Hey folks, in Prefect 2.0, is there any way to use a method like Prefect 1.0 'set_upstream' but on a subflow? The reason i ask is that there is a convenience method on the 'prefect-databricks' library that triggers a fire and monitor status until complete call, but this is a @flow instead of an @task, just wondering if it is possible to set many flows to be upstream of a flow?
    j
    j
    • 3
    • 7
  • i

    Ilya Galperin

    08/29/2022, 3:55 PM
    Hi all - we are working on implementing a solution to send a Slack notification on flow failures with prefect 2.0. It seems like one way to do this is to capture the flow state via the following:
    @flow(name="always-fail")
    def entrypoint():
        raise ValueError("my error message")
    
    if __name__ == "__main__":
        state = entrypoint(return_state=True)
        print(state)
        print(state.result)
    In this scenario,
    state.result
    can give us the detailed error message for passing back into Slack which seems really useful. However, it’d also be great if we can also provide a direct link to the flow using the flow’s run id. How are other folks implementing this, and is there a way to directly access the flow’s run id after calling the flow from main (or otherwise)?
    ✅ 1
    m
    • 2
    • 5
  • j

    José Duarte

    08/29/2022, 4:07 PM
    I’ve attempted the solution in https://prefect-community.slack.com/archives/CL09KU1K7/p1661780834915459 However the regular Python logger will now raise:
    prefect.exceptions.MissingContextError: Logger 'dystematic.pdbt.ftp' attempted to send logs to Orion without a flow run id. The Orion log handler can only send logs within flow run contexts unless the flow run id is manually provided.
    This isn’t covered in the post nor the docs. The reason I want to use the regular Python logs is because I cannot use the Prefect one when launching from the console as I get:
    RuntimeError: There is no active flow or task run context.
    ✅ 1
    👀 1
    b
    a
    • 3
    • 7
  • t

    Tomás Emilio Silva Ebensperger

    08/29/2022, 4:39 PM
    For some reason, the artifacts create_markdown is rendering the markdown, but it is ignoring the images. Has anyone had this issue?
    1️⃣ 2
    ✅ 1
    b
    • 2
    • 9
  • n

    Nick Coy

    08/29/2022, 4:41 PM
    Hello, I am testing out prefect 2.0 and I have a GCS remote storage block enabled for a deployment. When I start an agent and then run the flow from the cloud I keep getting this warning
    UserWarning: Block document has schema checksum sha256:dbeeaf09aa78947a7c576549b11e098c00a25bcbbf90b8d8b70c0c3a3fc8f4a2 which does not match the schema checksum for class 'Process'. This indicates the schema has changed and this block may not load.
    When I go to the bucket I see the flow files there and the flow runs fine.
    ✅ 1
    j
    n
    • 3
    • 6
  • n

    Numline1

    08/29/2022, 6:20 PM
    Hey guys, so I managed to do my first deployment (😛arty-parrot: ) but the scheduled tasks are failing with this weird error 😞 Any help appreciated
    k
    s
    • 3
    • 7
  • n

    Numline1

    08/29/2022, 6:20 PM
    also pls mind I'm more oriented towards devops side of things, so I'm not 100% sure how all python stuff works yet. The prefect install has it's own virtualenv, the script should run in the same venv (for now)
    ✅ 1
    n
    • 2
    • 2
  • m

    Maggie Dart-Padover

    08/29/2022, 7:25 PM
    This is probably a dumb question, but is it possible to change or unregister a task when I'm developing a script in interactive mode? When I define a task it works fine, but if I edit the function for the task slightly and try to run the code to define it again I get the error
    OSError: could not get source code
    .
    del the_task
    does remove it from my environment, but then if I try to run the task definition again I still get the error. I'm not totally sure if this is a bug or expected behavior. Is my workflow here just wrong for prefect?
    ✅ 1
    a
    • 2
    • 3
  • s

    Sam Garvis

    08/29/2022, 7:27 PM
    When I run this
    prefect deployment build name.name -n name_dev -t dev_wq_k8s -sb gcs/dev --work-queue=dev_wq_k8s -ib kubernetes-job/dev-k8s-job
    I get this error
    Job is missing required attributes at the following paths: /apiVersion, /kind, /metadata, /spec (type=value_error)
    Even though in the Prefect 2.0 UI for creating a k8s job block, it says
    Job (Optional)
    ✅ 1
    👀 1
    • 1
    • 2
  • a

    ash

    08/29/2022, 8:41 PM
    Hello everyone, I am using prefect server deployed on K8 and using git as storage. I am using a flow with date string as parameter shown in mage 1 and getting error shown in image 2. Can anyone guide or help me on how to resolve this error.
    1️⃣ 1
    c
    • 2
    • 3
  • r

    Richard Freeman

    08/29/2022, 10:30 PM
    does someone have experience with Prefect Disaster Recovery? What are the best practices to restore the workflow and also the history of the runs?
    ✅ 1
    a
    • 2
    • 3
  • n

    Nace Plesko

    08/29/2022, 11:04 PM
    Hi all! I'm just getting into Prefect and trying to understand how everything works. I'm not starting a prefect project from scratch, but trying to build on top of the existing one (mainly working on fixes now). The issue I'm running into right now that I can't find the answer to online is that
    ShellTask
    is not logging all the outputs from the script that it's running. I am looking at https://docs-v1.prefect.io/api/latest/tasks/shell.html#shelltask and seems like just setting
    stream_output=True
    should do the job, but I'm still seeing just
    Command failed with exit code 1
    . I also tried setting
    return_all=True
    and
    log_stderr=True
    , but still the same behavior. Has anyone ran into this issue in the past?
    ✅ 1
    m
    • 2
    • 5
  • w

    Walter Cavinaw

    08/30/2022, 1:01 AM
    I am coming across this error. We are using saturn cloud to manage kubernetes. I have reached out to them, but hoping someone here can point me in a direction I can start debugging. This is prefect 1.0 still (we have put time aside to migrate before the end of the year). The container seems to terminate mid-task
    c
    • 2
    • 1
  • y

    Yousef Hosny

    08/30/2022, 1:47 AM
    Hi all! Does anyone know why
    deployment.apply()
    doesn't work from a jupyter notebook cell, but does work from
    .py
    script file. Also, is there anyway to deploy a flow to prefect cloud using jupyter notebook ?
    ✅ 1
    j
    j
    • 3
    • 8
  • t

    Tommy Nam

    08/30/2022, 2:22 AM
    Hey guys, quick Q. Just wondering if anybody knew how stable the deployment YAML for agents would be going forward in Prefect 2.0. I know there were some changes in the past few weeks, but was wondering how often, if ever, the deployment YAML for agents will change in the future as we are planning to put our deployment YAMLs for agents in source control yet we would like to avoid having to manually reconfigure things if they change between updates. Any assistance in the matter would be greatly appreciated
    ✅ 1
    c
    a
    • 3
    • 2
  • a

    Anat Tal Gagnon

    08/30/2022, 4:17 AM
    Hi, I'm running a flow run that downloads a file to local file system within docker container and getting [Errno 13] Permission denied. is there best practice to bypass this?
    ✅ 1
    c
    • 2
    • 7
  • f

    Faheem Khan

    08/30/2022, 7:33 AM
    Prefect 2.0.4, dask, prefect UI displays logs from flows and subflows only, I have set loggers in task but those logs are not showing up in UI, I am using info logs only
    ✅ 1
    c
    • 2
    • 5
  • a

    Anat Tal Gagnon

    08/30/2022, 8:25 AM
    Hi guys, in order to transfer big files between 2 saas systems I have to download locally first? and if so within Docker container is there a best practice?
    👀 1
    b
    • 2
    • 1
  • a

    Adrien Besnard

    08/30/2022, 9:12 AM
    Hello! I have two small questions related to blocks: • Even if the
    prefect_dbt
    and
    prefect_airbyte
    collections exists, I was wondering if it makes sense to have something like a dedicated block for Airbyte (which allow us to store the
    server_url
    but also invoke a
    trigger_sync
    function) and the same question with a DBT block (and we store all the information that we can find in the
    profiles.yml
    , for exemple)? • What is the best way to log from a function of a
    Block
    subclass? Can we use the
    get_run_logger()
    or is preferable to use some sort of callbacks that are going to be invoked inside a
    @task
    or something? Thanks!
    ✅ 1
    a
    • 2
    • 1
  • a

    Aditya Sharma

    08/30/2022, 10:47 AM
    Hi #prefect-community, getting my hands dirty on installing prefect for certain flows... As per the requirement we are supposed to use GraphQL agent. which i have no idea how to integrate into the local prefect system!! has anybody Worked on this? or any idea how to move forward?
    🆘 1
    ✅ 1
    a
    • 2
    • 3
  • p

    Parwez Noori

    08/30/2022, 11:20 AM
    Hi, has anyone experienced the following error that the Kubernetes Jobs cannot pull an public image. It gives the following error: Inserted in the thread.
    ✅ 1
    c
    • 2
    • 16
  • t

    Tarek

    08/30/2022, 12:59 PM
    Hi all, am using v2.1.1 and deployed prefect UI to kubernetes via helmchart. for some reason all the blocks and deplyoments dissappeared from the UI after a while, not immediately , could be this a prefect issue, or could be some configuration in the helm chart..thanks..
    n
    j
    • 3
    • 3
  • j

    Josh Paulin

    08/30/2022, 1:24 PM
    Is there any way to attach some kind of listener to a flow as it changes states? In Prefect 1 I would have done this with state handlers, but those don’t seem to be an option if I’m looking to monitor the flow status instead of task status. My use case is to sync the Flow status with our legacy (homegrown) orchestrator to help perform the migration, so it can almost be thought of as custom notifications.
    ✅ 1
    c
    n
    a
    • 4
    • 7
  • a

    Anat Tal Gagnon

    08/30/2022, 1:39 PM
    Hi again, I'm trying to start flow run from REST API with Postman - but it's stuck in status PENDING. The agent started but it's not picking it up.... any ideas ??? http://localhost:4200/api/deployments/aa64fe41-a2f2-43c3-80f8-38e9d5c1ad26/create_flow_run
    ✅ 1
    n
    a
    • 3
    • 30
  • t

    Tim Enders

    08/30/2022, 1:56 PM
    Well, shoot. How do I solve the lovely
    _pickle.PicklingError: Pickling client objects is explicitly not supported.
    Clients have non-trivial state that is local and unpickleable.
    Pickle error on clients in Prefect 2.0?
    a
    • 2
    • 8
Powered by Linen
Title
t

Tim Enders

08/30/2022, 1:56 PM
Well, shoot. How do I solve the lovely
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
Pickle error on clients in Prefect 2.0?
a

Anna Geller

08/30/2022, 2:47 PM
I think we would need more info here - at what point did you get that error? does it come from Dask?
t

Tim Enders

08/30/2022, 3:13 PM
Currently getting API connection errors, I will follow up when I can.
OK, I am using the ConcurrentTaskRunner and passing a client created in the Flow into the task. I do that earlier and it works, but the second (
bqclient
) throws the pickling error.
def main(full_active_load=False, full_load=False):
    logger = get_run_logger()
    token = os.environ.get("PSH_API_TOKEN")
    client = AccountsClient(
        token,
        token_url="REDACTED",
        api_url="REDACTED",
        client_id="REDACTED",
    )

    dest_dataset = os.environ.get("GCP_ACCOUNTS_DATASET")
    query = f"select max(CAST(id AS INT64)) as last_id from `{dest_dataset}.subscriptions_orion` where api_source = 'accounts'"
    highwater = query_highwater(query)

    params = get_params(highwater, full_active_load, full_load)

    pages_list = get_pages_list(client, "subscriptions", params)
    items = get_items_list.map(
        unmapped(client), unmapped("subscriptions"), pages_list[:100]
    )
    sub_data = []
    for item in items:
        sub_data.extend(item.result())
    if not sub_data:
        return Completed(message="No data from the API")
    else:
        transformed_data = transform_data(sub_data)

    bqclient = bigquery.Client()
    bq_result = load_df_bq(bqclient, transformed_data)

    if isinstance(bq_result, LoadJob) and bq_result.state == "DONE":
        return Completed(message="Load Finished!")
    elif isinstance(bq_result, Failed):
        return bq_result
    else:
        return Failed(message="Load Failure")
a

Anna Geller

08/30/2022, 4:24 PM
ahh gotcha - we have a work-in-progress item to allow disabling result persistence which would help you here, but it's not released yet
but I definitely understand your issue and this is a high priority item to tackle early on in September
t

Tim Enders

08/30/2022, 4:25 PM
Thank you, Do you have an issue on Github I can watch?
a

Anna Geller

08/30/2022, 4:28 PM
no, but we'll release that soon for sure, super high priority item, we'll say more in release notes once released + add docs on that
t

Tim Enders

08/30/2022, 4:30 PM
thanks
🙌 1
View count: 3