https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Chris Keeley

    04/11/2022, 1:09 PM
    Hi, we have a requirement for triggering a flow in one tenant A after another flow running in tenant B completes. this doesn't seem to be how prefect is designed to work (?), but other requirements have made this tenant architecture necessary. is there any official support for this pattern, or will I have to write my own connector? I have looked at taking from the
    wait_for_flow_run
    method for creating flow of flows, but this seems designed to work with one tenant
    k
    a
    • 3
    • 12
  • c

    Claire Herdeman

    04/11/2022, 1:39 PM
    Hi, I have a flow of flows where I'm launching two instances of the same flow with different parameters. This pipeline runs every day, but occasionally (about once every two weeks or so), one fails to launch with the following error:
    An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
    Is a reliable way to avoid this or a best practice for launching multiple versions of a flow?
    k
    • 2
    • 2
  • r

    Roger Webb

    04/11/2022, 1:43 PM
    Using the LocalDaskExecutor... is there a way to limit the number of tasks to a specific number? With the schematic that a flow shows based on dependancies, We need to limit the number of executions in the entire flow to only be running 20 tasks at a time. Is there a configuration that Im not seeing?
    k
    • 2
    • 1
  • j

    Jelle Vegter

    04/11/2022, 2:14 PM
    Hi, do jobs sent to a local agent run in the terminal the agent is listening from? Meaning the job can access a cli authentication and environment variables?
    k
    a
    • 3
    • 11
  • n

    Nico Neumann

    04/11/2022, 2:39 PM
    Hey! I try to interact with the GraphQL api and I want to set the default parameters for a flow group. What is the right way to pass the parameters (as dict/json) into the query?
    :discourse: 1
    k
    • 2
    • 7
  • c

    Chris Martinez

    04/11/2022, 2:59 PM
    Hi Good morning, is anyone familiar with dropping the following labels from a job/task in a kubernetes agent:
    <http://prefect.io/flow_id=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx|prefect.io/flow_id=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx>                                                                                                                                                            
    <http://prefect.io/flow_run_id=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxx|prefect.io/flow_run_id=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxx>                                                                                                                                                         <http://prefect.io/identifier=xxxxxxxx|prefect.io/identifier=xxxxxxxx>
    :discourse: 1
    k
    • 2
    • 7
  • s

    Stephen Lloyd

    04/11/2022, 3:39 PM
    I’d like to use a Secret stored in Prefect Cloud in my run config. I have stored the secret using
    >>> from prefect import Client
    >>> client = Client()
    >>> client.set_secret(name="test", value="test")
    >>> client.set_secret(name="AWS_ACCOUNT_ID-dev", value="123456789098")
    It seems like I should be able to use
    from prefect.client import Secret
    
    aws_account_id = Secret('AWS_ACCOUNT_ID-' + RUN_ENV).get()
    However, I receive the following error:
    ValueError: Local Secret "AWS_ACCOUNT_ID-dev" was not found.
    How can I retrieve a secret value outside of a task?
    k
    • 2
    • 9
  • a

    Atsushi Saito

    04/11/2022, 4:14 PM
    Hi, which python path is used for
    LocalRun
    ?? Can I use other virtual environments’ python path like virtual envs in pyenv or conda path?
    :discourse: 1
    m
    • 2
    • 3
  • m

    Matt Delacour

    04/11/2022, 4:18 PM
    hi everyone 👋 If I wanted to try Prefect now for my company. Should I go with Prefect 1 or Prefect 2 ? Aiming to build something in production within the next 2 months.
    a
    • 2
    • 2
  • j

    Josh

    04/11/2022, 4:22 PM
    I’m getting intermittent errors about a Prefect Secret not being set in the cloud for my tenant. It happens every once in a while, and when I re-run the flow, it succeeds. Any idea what is causing the problem and what I can do to make sure it stops being flaky?
    Exception raised while calling state handlers: KeyError('The secret SLACK_PREFECT_NOTIFICATIONS_WEBHOOK_URL was not found.  Please ensure that it was set correctly in your tenant: <https://docs.prefect.io/orchestration/concepts/secrets.html>')
    k
    a
    • 3
    • 7
  • a

    Anders Segerberg

    04/11/2022, 4:27 PM
    For idempotency keys: I want to use the Prefect templating, in conjunction with a runtime variable. Will
    create_flow_run(..., idempotency_key="{date}-"+MY_VAR)
    function as expected? It's hard to tell, because I don't know yet how Prefect compiles the string templating as part of tasks. Would I need to generate this string as a task result, and then pass the result to the kwarg?
    k
    • 2
    • 2
  • a

    Atsushi Saito

    04/11/2022, 5:26 PM
    I have questions on the
    late runs
    icon on cloud UI. However, cloud-UI shows neither success (green status) nor failure (red status) Is this a issue about docker-login ? or are there other possible causes?
    k
    • 2
    • 4
  • n

    Naimesh Chaudhari

    04/11/2022, 5:28 PM
    Hi all - I am trying to get prefect to work on Kubernetes hosted on aws EKS. I can access the UI and it in turn is able to access the graphql pod. I am having an issue where whenever i create a project it fails. Can anyone help resolve this. Is it related to the RDS postgresql database I connected? How would I go about debugging this
    k
    • 2
    • 32
  • s

    Sam Garvis

    04/11/2022, 5:45 PM
    Does anyone know how to register a flow with DockerRun and GitHub storage, but in the GitHub storage, instead of a python file point it to a ipynb. I was looking into Prefect ExecuteNotebook and didn't find anything useful to me. I also tried tagging the ipynb cells with the tag parameter, but I don't think that worked. It is not a huge deal but it would be nice to make the ipynb the GitHub storage instead of making a python file for every notebook
    k
    • 2
    • 3
  • a

    Anders Segerberg

    04/11/2022, 6:03 PM
    I am trying to test caching behavior. I've run a flow to completion, and then marked it as failed, so that I can restart it. I have a log that looks like this:
    13:53:26 INFO CloudFlowRunner 
        Flow run SUCCESS: all reference tasks succeeded
    13:55:13 INFO
        User marked flow run 
                as Failed
    13:55:21 INFO
        null restarted this flow run
    13:55:22
        INFO agent Submitted for execution: Container ID: 
            <id>
    13:55:22 INFO
        S3 Downloading flow from s3://<key>
    13:55:23 INFO
        S3 Flow successfully downloaded. ETag: <>, LastModified: <> VersionId: <>
    13:55:23 INFO CloudFlowRunner
        Beginning Flow run for <pipeline>
    13:55:23 INFO CloudFlowRunner
        Flow run SUCCESS: all reference tasks succeeded
    I would expect the flow to re-run entirely. However, it doesn't look like it does (it just jumps straight into Success). Is this due to Prefect's default input caching? To be specific, this flow is a flow-of-flows. I have two subflows
    k
    a
    • 3
    • 3
  • n

    Naimesh Chaudhari

    04/11/2022, 6:12 PM
    Ok second question - trying to register a flow using python library. prefect_client = Client(api_server=prefect_url). Getting an error "You have not set an API key for authentication." How would I get an API Key?
    k
    • 2
    • 11
  • d

    David Haynes

    04/11/2022, 6:20 PM
    Does anyone know the preferred way to get the state of a Flow after execution? I tried to write a terminal_state_handler but cannot find how to import the 'State' module. Is this even the correct approach or is there another way that is better?
    a
    k
    • 3
    • 16
  • k

    Karim Zaghw

    04/11/2022, 6:27 PM
    Hi everyone! I just downloaded Prefect 2.0 and was wondering if it's possible to create a dependency between two different flows. The reason I'm asking is because I want each flow to run in a different conda environment so it needs to be on its own deployment, but I also want it to be possible for a flow to run depending on the state of another flow in a different environment. Any help would be really appreciated. Thanks!
    a
    • 2
    • 20
  • n

    Naimesh Chaudhari

    04/11/2022, 6:33 PM
    How do I debug why my flow runs are sitting in submitted stage.
    a
    k
    • 3
    • 16
  • v

    Vaikath Job

    04/11/2022, 7:19 PM
    Hi, new here and apologies if this has been asked. I have tried searching this slack and online for answers and the closest thing I could find that was an answer was the Flow-of-flows post. The scenario that I'm trying to deal with is the following: 1. There are two flows (A and B). I maintain B and another project team maintains A. 2. I cannot run the flow A as this flow generates data for the company at large for multiple projects to use. 3. I need my flow to run after the latest successful flow run of A (this happens on a roughly weekly basis but can sometimes be delayed) with updated parameters. Given the above, what is the best pattern to implement in Prefect? I have written a function that queries the GraphQL API and can return the result from the final state of the latest successful flow run of flow A. Is using this result the best way to proceed? The other team is willing to change the way their flows are run and what data/metadata their flows generate. I just cannot set it up as a flow-of-flows because they do not want other teams running their flows.
    ✅ 1
    k
    • 2
    • 4
  • t

    Tony Yun

    04/11/2022, 7:48 PM
    hi, is there a way that we can turn off/on a flow schedule from running a CLI command?
    k
    a
    • 3
    • 6
  • a

    Anders Segerberg

    04/11/2022, 8:57 PM
    Are the date/times in the Prefect format strings returned as UTC?
    :discourse: 1
    k
    • 2
    • 1
  • v

    Vaikath Job

    04/11/2022, 11:09 PM
    GraphQL API python client syntax question: I'm trying to query a flow run by filtering on the value of the parameters. I cannot figure out how to pass a json blob object to the query using python. When I use json.dumps method python produces a string and as a result I get a bad request error. Not sure how to create a jsonb type parameters object and I was hoping someone might be able to point me in the right direction.
    :discourse: 1
    k
    a
    • 3
    • 5
  • e

    Eddie Atkinson

    04/12/2022, 2:35 AM
    This is a really silly question, but I can’t quite clue out the answer from Dask’s and Prefect’s documentation. My aim is to use Prefect to orchestrate flows with varying memory requirements using a Dask cluster. As an MVP I’ve set up an
    ECSRun
    flow using the
    LocalDaskExecutor
    with 30GB of RAM. For large jobs this flow OOMs and gets killed by the Prefect scheduler. My question is this: If I set up a Dask cluster to run these jobs would it gracefully handle memory issues? That is to say if I had 30GB of RAM in the cluster and a job that required 50GB would Dask OOM or would it simply run slower? Do I need to modify my code to use Dask dataframes or is there some smarts here I’m not quite across?
    :discourse: 1
    k
    • 2
    • 4
  • a

    Atsushi Saito

    04/12/2022, 5:19 AM
    This might be a silly question, but should
    labels
    be specified in both running environments (docker runner or local runner) and remote server-side (i.e. cloud UI or server UI) ??
    a
    • 2
    • 1
  • a

    Alexander Belikov

    04/12/2022, 8:59 AM
    Hi! I have a question about using two schedules in the flow of flows construct: I posted it here https://stackoverflow.com/questions/71839812/prefect-loop-over-flows-two-schedules
    a
    • 2
    • 8
  • r

    RAISS Zineb

    04/12/2022, 12:30 PM
    Hello everyone I am a beginner with Prefect, in order to create a Data Pipeline, I want to connect it with a SQL DB that I have locally so I used the documentation: https://docs.prefect.io/api/latest /tasks/sql_server.html#sqlserverexecute My questions are: -In order to configure the connection between SQL and Prefect I use the classes and I put it all in a .py file and I do execution?? -If Yes, at the .py code level, do a pyodbc import? - If not, how can I do? -for the args data & **kwargs what should be put? -For the password of the user what should be added to indicate it pwd or other arg?
    :discourse: 2
    a
    • 2
    • 16
  • c

    Constantino Schillebeeckx

    04/12/2022, 1:55 PM
    I'm having issue with multiple layers of mapped tasks running out of expected order 🧵
    :discourse: 1
    a
    • 2
    • 4
  • d

    Domenico Di Gangi

    04/12/2022, 2:31 PM
    Hi! Is it possible to define a task by decorating a class method in orion with @ task? looking at orion docs I don't seem to find examples that use class methods. Is this discouraged by design somehow?
    a
    • 2
    • 2
  • v

    Vasco Leitão

    04/12/2022, 2:48 PM
    Hello! I'm exploring Prefect integration with AWS and trying to extract values from a AWS Parameter Store inside a task. From what I understand from docs, I can run the method in the AWSParametersManager class, however, I'm getting an error from the
    defaults_from_attrs
    helper decorator. What am I missing? 😅 (Code in the thread)
    :discourse: 1
    k
    a
    • 3
    • 12
Powered by Linen
Title
v

Vasco Leitão

04/12/2022, 2:48 PM
Hello! I'm exploring Prefect integration with AWS and trying to extract values from a AWS Parameter Store inside a task. From what I understand from docs, I can run the method in the AWSParametersManager class, however, I'm getting an error from the
defaults_from_attrs
helper decorator. What am I missing? 😅 (Code in the thread)
:discourse: 1
from prefect.tasks.aws.parameter_store_manager import AWSParametersManager

...
@task(name="Fetch Invoices from FTP Server")
def fetch_invoice_files() -> list:
    
    logger = prefect.context.get('logger')
    try:
        with paramiko.SSHClient() as s:
            s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            UPS_USERNAME = AWSParametersManager.run('ftp_user')
            UPS_PW = AWSParametersManager.run('ftp_password')
            s.connect(URL, 10022, username=UPS_USERNAME, password=UPS_PW)
    ...
    return files


...
with Flow(
    'parameter-test',
    run_config=flow_run_config,
) as flow:
    files = fetch_invoice_files()
    ...
Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/vascoleitao/Document/flows/ups_ftp_finance/flow.py", line 47, in fetch_invoice_files
    UPS_USERNAME = AWSParametersManager.run('ftp_user')
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 455, in method
    kwargs.setdefault(attr, getattr(self, attr))
AttributeError: 'str' object has no attribute 'parameter_name'
k

Kevin Kho

04/12/2022, 2:49 PM
Is
fetch_invoice_files
a task?
Ah I see. Can you try:
UPS_USERNAME = AWSParametersManager().run('ftp_user')
v

Vasco Leitão

04/12/2022, 2:56 PM
Yeah, it was that, though I needed to reference the arg in the
run
method.
UPS_USERNAME = AWSParametersManager().run(parameter_name='ftp_user')
However, I'm getting unexpected values. I should get the clear value of the parameter, right?
k

Kevin Kho

04/12/2022, 3:07 PM
What are you getting?
v

Vasco Leitão

04/12/2022, 3:11 PM
These two parameters are SecureString
print(UPS_USERNAME)
print(UPS_PW)
AQICAHjJcwEWLpsKGepQ9HpDWyYbuEDjdo5wDqcdcbs0xLhB4gGKDfIHfHZUA5CspbLvk+NKAAAAazBpBgkqhkiG9w0BBwagXDBaAgEAMFUGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQMG79ax+1uE+LuAb7AAgEQgCiEqBR/t74O9x2qFLEJNAg

AQICAHjJcwEWLpsKGepQ9HpDWyYbuEDjdo5wDqcdcbs0xLhB4gFrFkanm+V4F1cko5UBGctcAAAAaTBnBgkqhkiG9w0BBwagWjBYAgEAMFMGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQMTNnIXGejIWXNzUOYAgEQgCa
k

Kevin Kho

04/12/2022, 3:13 PM
I haven’t used it, but the source is pretty straightforward. Do you know how you’d do it in normal python?
v

Vasco Leitão

04/12/2022, 3:17 PM
Yeah. I'm going to try it out with the boto3 SSM client and see if the problem still holds. Thank you for the swift response! If I get more issues on this, I'll update the thread.
👍 1
I think I understood what's the issue here. SecureString parameters require decryption to be accessed. The boto3 SSM client has the option to decrypt the value (docs), while AWSParameterManager still has not.
👍 1
k

Kevin Kho

04/12/2022, 3:36 PM
Ah you can make your own task then (override the run method). Pretty sure we’d welcome a PR to make this configurable also if you’re interested 🙂
a

Anna Geller

04/12/2022, 4:49 PM
https://discourse.prefect.io/t/how-to-use-the-awsparametersmanager-task/745
View count: 12