https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Milton

    03/25/2022, 6:25 PM
    Hi there, we are running the prefect agents in kubernetes as per https://docs.prefect.io/orchestration/agents/kubernetes.html#running-in-cluster. We notice if we don’t specify the agent name with
    --name
    , it will assume the default name
    Kubernetes
    . This is okay when you only run one replica, but when you increase the replica number to 2, both will take the default name and in the Prefect UI, the two agents are treated as the same agent. So what is the recommended way to deploy multiple agents for HA purposes in Kubernetes?
    k
    • 2
    • 8
  • w

    Wei Mei

    03/25/2022, 7:12 PM
    Hilo, hopefuly simply question. When I write my code password=Secret(“PREFECT_PW”).get(), how do I test this when i run it locally?
    k
    • 2
    • 3
  • h

    Henry

    03/25/2022, 7:24 PM
    Hello. We upgrade from 0.15.14 to 1.1.0 and it seems like our flows are still running but nothign shows up on the dashboard unless we pick a project
    :discourse: 1
    k
    m
    a
    • 4
    • 14
  • p

    Patrick Tan

    03/25/2022, 7:28 PM
    Hi, regarding prefect logger, is there any way to store them to external storage like my own database, in additions to Prefect Cloud? Eg [2022-03-25 15:13:44-0400] INFO - prefect.TaskRunner | Task 'get_config['textlist_id']': Finished task run for task with final state: 'Success' [2022-03-25 15:13:44-0400] INFO - prefect.TaskRunner | Task 'query_from_opensearch': Starting task run... [2022-03-25 15:13:44-0400] INFO - prefect.TaskRunner | Task 'query_from_opensearch': Finished task run for task with final state: 'Success' [2022-03-25 15:13:44-0400] INFO - prefect.TaskRunner | Task 'from_s3': Starting task run... [2022-03-25 15:13:45-0400] INFO - prefect | Read from S3: 3 [2022-03-25 15:13:45-0400] INFO - prefect.TaskRunner | Task 'from_s3': Finished task run for task with final state: 'Success' [2022-03-25 15:13:45-0400] INFO - prefect.TaskRunner | Task 'validate': Starting task run...
    k
    • 2
    • 4
  • d

    Danny Vilela

    03/25/2022, 7:29 PM
    Hey all! Simple question: if I raise an error within a task (i.e.,
    ValueError
    ) is it possible to recover that traceback/error from a state handler? Right now I can say “this task failed and will be retrying in X minutes”, but not “this task failed for reason Y and will retry in X minutes”. Or can we pass keyword arguments to a state handler with this signature:
    # What I have now.
    def notify_on_retry(task: Task, old_state: State, new_state: Retrying) -> State: ...
    
    # Maybe what I want?
    def notify_on_retry(task: Task, old_state: State, new_state: Retrying, message: str) -> State: ...
    
    # Alternative?
    def notify_on_retry(task: Task, old_state: State, new_state: Retrying, **kwargs) -> State: ...
    k
    a
    • 3
    • 6
  • h

    Harry Baker

    03/25/2022, 8:45 PM
    is it possible to create a helper function/task that implements both create_flow_run and wait_for_flow_run? I tried:
    @task()
    def flow_run_helper(flow_name, project_name):
        cfr = create_flow_run(flow_name=flow_name, project_name=project_name)
        wfr = wait_for_flow_run(cfr, stream_logs=True, raise_final_state=True)
        return wfr
    but its yelling at me about "ValueError: Could not infer an active Flow context while creating edge". my app does a lot of chaining of flows, so i wanted to streamline this
    a
    k
    • 3
    • 9
  • l

    Lee Briggs

    03/25/2022, 9:27 PM
    still wrapping my head around some concepts here, so bear with me. I've got a a working prototype up and running which uses a flask webapp to dispatch flows to prefect cloud. Currently, I'm seeding the data (ie the tasks and flows to run) with a "bootstrap" script that needs to run before the webapp is launched, but I'd like it really to be a "batteries included" experience. How are people generally solving this? I could have a "setup" page in my flask app, but I don't want to get super involved for now
    k
    • 2
    • 6
  • d

    Dominic Pham

    03/26/2022, 12:22 AM
    Hi all, when I create a connection to a db using prefect.task.sql_server and I create a global temp table, will the temp table persist even when the flow is done? Or will I have to explicitly close the connection as a final task in the flow?
    :discourse: 1
    k
    a
    • 3
    • 7
  • c

    Chu Lục Ninh

    03/27/2022, 11:31 AM
    Hi @Kevin Kho, Orion is getting more attractive now. But the collection is only maintained by prefect, can I contribute to Orion task collection? Or any guide to create my own task collection and share that to other developers?
    k
    a
    • 3
    • 3
  • h

    himanshu pandey

    03/27/2022, 2:06 PM
    Hello There My name is Himanshu Pandey and I am a security researcher. I have read privacy policy and I have found vulnerability in prefect services. Please contact me ASAP. I hope you understand me. Thank you, Regards.
    k
    • 2
    • 7
  • l

    LI LIU

    03/28/2022, 1:02 AM
    Hello, I am evaluating whether to use Airflow or Prefect for a side project that includes extracting data from a API and loading them in Snowflake. My understanding is if I use Airflow, I need to use the cloud vendor such as AWS MWAA, which would incur some cost. In comparison, using the Prefect cloud to start out is free. Is my understanding correct? Thank you!
    k
    m
    a
    • 4
    • 5
  • j

    Jeff Kehler

    03/28/2022, 8:31 AM
    I seem to be having a problem passing a
    Parameter
    into the class constructor
    __init__
    of a task that has been subclassed from
    prefect.Task
    I am receiving a
    prefect.Task
    object instead of the value from the Parameter itself. I can't seem to figure out what I'm doing wrong.
    s
    a
    • 3
    • 4
  • m

    Michael Smith

    03/28/2022, 8:36 AM
    Hello, we are evaluating prefect and have a particular use case in mind that requires us to (1) Run high level workflow steps in "Project A" (2) Run specific (smaller) steps of the above workflow in "Project B" Is is possible to deploy an Agent to each project so that Prefect is managing the entire execution (we currently perform cross-project RPC calls but would like to move away from this approach)
    a
    • 2
    • 2
  • j

    Jons Cyriac

    03/28/2022, 9:14 AM
    Hi, I am wondering where the logs and run histories are saved. Can anyone point me to the right direction? (Im using prefect-server as backend)
    a
    • 2
    • 2
  • b

    Bennett Lambert

    03/28/2022, 10:55 AM
    Hi all, my company is testing prefect and I am having trouble installing a kubernetes agent, which I suspect is because of the company proxy. When I try to install the agent the logs say that it is being registered, but then eventually I get an SSL error.
    requests.exceptions.SSLError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)')))
    Is there a way to provide proxy credentials when starting the agent? I've tried using the --env flag to supply proxy information. Or is there something similar to a --verify=false that I can set?
    a
    • 2
    • 5
  • r

    Rahul Kadam

    03/28/2022, 12:42 PM
    Hi Team, We are using prefect core as of now and thinking about switching to prefect cloud. Can i get a link or documentation details around what are key advantages of cloud offering vs core product. Also, compared to core product, how much we can still control in prefect cloud and which specific things will be out of end users hand and will be fully managed by cloud offering ?
    a
    • 2
    • 1
  • f

    Florian Guily

    03/28/2022, 1:05 PM
    Hey, we are thinking about developping a simple mvp web app that needs to pulls data from an external api and do a basic visualization. Do you think we can do it with prefect as the EL tool ? Even tho there is no real Load part as it will only consist of eventually parse/transform the response of the external API and directly pass it to the front. I was thinking about triggering the EL flow with the front end calling the Prefect API and the response would be the data collected by the flow. Is it possible or do you recommend another way ?
    k
    • 2
    • 11
  • b

    Bennett Lambert

    03/28/2022, 2:13 PM
    We've run into another issue that I was wondering if anyone could help clarify. I am running a dask cluster on an on-premise kubernetes cluster and I have the kubernetes agent running in the same namespace. The agent is connected to prefect cloud and the jobs are running on the dask cluster successfully. However, the agent never reports any runs. Is there something else that is needed in order to connect the agent with flows executed on a dask cluster?
    k
    • 2
    • 4
  • d

    Dekel R

    03/28/2022, 2:44 PM
    Hey, I have a complex data extraction flow with 2 main cases. Each case has its own tasks and some tasks are relevant to both of them. The layout of the flow looks somewhat like this -
    with Flow('html_data_extraction__dev',
              storage=Docker(registry_url="us-central1-docker.pkg.dev/***/",
                             dockerfile="./Dockerfile"),
              schedule=daily_schedule, executor=LocalDaskExecutor(scheduler="processes")) as flow:
         mode = Parameter(name='mode', default=None)
         with case(mode, None):
              data_a=task_a
         with case(mode, 'onboard'):
              data_b=task_b
         
         data_c=merge(data_a, data_b)
         task_c
         task_d
          
         with case(mode, None):
              task_x
         with case(mode, 'onboard'):
              task_y
    Tasks a and b retrieve some data (each one is relevant for a different data source), task c and d are common (not in a “case”) - doing X on the data (the data looks the same at this point) and then again tasks x and y are different - each one is relevant for a different case. When running locally (mac, flow,run…) it all works as expected. When running on Prefect cloud - all of the tasks gets skipped (exactly the same code and credentials) Any idea on what I’m missing here? I’m using “upstream_tasks” in order to run the tasks in a specific order when necessary. Thanks
    k
    • 2
    • 5
  • k

    kevin

    03/28/2022, 3:54 PM
    hey guys I have a flow in prefect cloud that’s showing this error
    {'_schema': 'Invalid data type: None'}
    after it is scheduled but before the first task gets executed. When I try to run this flow in a local environment it executes as expected. Any idea what could be causing this issue?
    k
    • 2
    • 15
  • m

    Michael Smith

    03/28/2022, 4:46 PM
    Hello, I am looking into Prefect 2.0 and am wondering how we might implement a cleanup function - in prefect 1 from what I have seen we could use triggers "All Finished" when defining the task - I cant see the equivalent in V2, any suggestions?
    a
    k
    • 3
    • 14
  • m

    Myles Steinhauser

    03/28/2022, 4:48 PM
    I stumbled into the ECS Capacity Provider issue (PR just recently fixed https://github.com/PrefectHQ/prefect/pull/5411). I am curious when this might be released for Prefect 1.0 agents. This isn’t entirely a blocking issue for my team, but it is forcing us to workaround it via instance type selection via other methods.
    k
    • 2
    • 1
  • k

    Ken Nguyen

    03/28/2022, 5:17 PM
    Hi! One of my flows uses GQL to pull logs from another flow. The log pulling flow is directly triggered by the flow that produces the log to be pulled. I continue to get this error for the log pulling flow:
    {'errors': [{'path': ['flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}], 'data': None}
    When re-running a flow at a later time, it runs successfully. Are there any docs that can provide info on API limits?
    a
    • 2
    • 8
  • h

    Harry Baker

    03/28/2022, 7:05 PM
    is there a write up/document about specifically what's new/different in prefect 2.0? dont really want to read through all of the docs from scratch, but a summary of what the changes are would be nice
    k
    m
    • 3
    • 4
  • m

    Michael Smith

    03/28/2022, 7:44 PM
    Hello, my prefect 2.0 tests are going well...I see that Flow has a timeout_seconds parameter, is there any way to set Task level timeouts as well?
    :discourse: 1
    k
    m
    • 3
    • 6
  • m

    Myles Steinhauser

    03/28/2022, 8:36 PM
    Does Prefect 1.1 support retries on
    Flows
    ? Specifically, I’m trying to workaround some delayed scaling issues with ECS using EC2 instances (not ECS with Fargate tasks) Often, this failure is reported back to Prefect like the following error until Capacity Provider scaling has caught up again:
    FAIL signal raised: FAIL('a4f09101-0577-41ce-b8b0-31b84f26d855 finished in state <Failed: "Failed to start task for flow run a4f09101-0577-41ce-b8b0-31b84f26d855. Failures: [{\'arn\': \'arn:aws:ecs:us-east-1:<redacted>:container-instance/a8bc98b7c6864874bc6d1138f758e8ea\', \'reason\': \'RESOURCE:CPU\'}]">')
    I’m using the following calls to launch the sub-flows (as part of a larger script):
    flow_a = create_flow_run(flow_name="A", project_name="myles")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    :discourse: 1
    a
    • 2
    • 14
  • a

    Alex Prokop

    03/28/2022, 8:39 PM
    Hi there, I'm trying to implement some custom logic to handle retries. When a task times out or fails, I'd like to be able to spawn new tasks using the arguments passed to the original task. And if those tasks succeed, I'd like to be able to change the original task's state from failed to successful. Anyone have any advice? I've read the docs on state handlers and played around with it and haven't quite found what I'm looking for. Sample code here:
    k
    • 2
    • 8
  • e

    Eric Mauser

    03/28/2022, 8:50 PM
    hey everyone, I'm trying to parellelize a flow that consists of several AirbyteConnectionTasks. Flow runs fine on the ECS Agent, but it runs each task in series. Is it possible to run the tasks in parallel when I'm generating them via a for loop, or is Prefect just running the tasks as they are generated? If so, how would this be done? Here is a toy code sample of what I'm working with
    connections = ['conn1', 'conn2',
                   'conn3']
                   
    with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
        for conn_id in connections:
             flow.add_task( AirbyteConnectionTask(
                airbyte_server_host=<Airbyte host>
                airbyte_server_port=<airbyte port>,
                airbyte_api_version="v1",
                connection_id=conn_id
            )
             )
    flow.run(executor=LocalDaskExecutor)
    k
    • 2
    • 2
  • l

    Leo Kacenjar

    03/28/2022, 9:47 PM
    Hi There, I'm wondering exactly how the Prefect Docker Agent calls docker run. I have a
    CMD
    in my dockerfile and it seems to not be executing. That makes me think it is being overwritten. Maybe I have to provide an
    ENTRYPOINT
    intead?
    k
    a
    • 3
    • 12
  • s

    Sacha Ventura

    03/29/2022, 5:36 AM
    Hi there, looking for some help running an ECS Agent with S3 storage. Getting a weird boto error.
    a
    • 2
    • 11
Powered by Linen
Title
s

Sacha Ventura

03/29/2022, 5:36 AM
Hi there, looking for some help running an ECS Agent with S3 storage. Getting a weird boto error.
import prefect
from prefect import task, Flow
from prefect.storage import S3

@task
def print_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f'hello world')

with Flow("hello-flow") as flow:
    print_task()

flow.storage = S3(bucket="****")

if __name__ == '__main__':
    flow.run()
I was able to get an ECS Service running the agent. So I do have an agent running under the "Agents" tab in the Prefect Cloud UI.
I was able to get past
Error downloading Flow from S3: Unable to locate credentials
by providing all the possible combinations of AWS env vars in the task definition container.
Now I'm getting something that looks like a boto error:
15:58:12
INFO
agent
Submitted for execution: Task ****
15:58:47
INFO
S3
Downloading flow from s3://***/hello-flow/2022-03-29t04-10-25-834594-00-00
15:58:47
ERROR
S3
Error downloading Flow from S3: 'str' object has no attribute 'get'
15:58:48
ERROR
execute flow-run
Failed to load and execute flow run: AttributeError("'str' object has no attribute 'get'")
any help appreciated 🙏
(I'm running
prefecthq/prefect:1.1.0-python3.8
as agent image)
a

Anna Geller

03/29/2022, 9:48 AM
@Sacha Ventura Here are some resources that may help you get started with Prefect and an ECS agent: 1. Blog post explaining how to set up an ECS agent as an ECS service Medium – 1 Nov 21 How to Cut Your AWS ECS Costs with Fargate Spot and Prefect Serverless Data Engineering Pipelines in Python Reading time: 11 min read 1. This repo has many examples with various storage and run_configuration options - check for those that contain
ecs
in its name: https://github.com/anna-geller/packaging-prefect-flows/tree/master/flows
The AttributeError you get comes from boto3, not Prefect. you need to create a task role (IAM role) with S3 permissions, ECS won't work with credentials provided as env variables
the blog post above shows how you can do that
s

Sacha Ventura

04/06/2022, 2:20 AM
Amazing @Anna Geller thank you for your answers! I’ll give it a go
🙌 1
Everything worked from the first go - @Anna Geller thank you so much
🙌 1
View count: 12