https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • z

    Zach Khorozian

    04/01/2021, 2:49 PM
    Hello, what is the best way to use global variables, for example database engine objects, in different tasks?
    m
    • 2
    • 3
  • l

    Luis Gallegos

    04/01/2021, 4:58 PM
    Hi, all, can you help me, i need a sequential execution on map method (not parallel, but serial), or something similar to achieve sequential looping over a list of params. Here's my code:
    from prefect import task, Flow, Parameter
    from prefect.executors import LocalDaskExecutor, DaskExecutor
    from prefect.tasks.prefect import StartFlowRun
    import prefect
    
    executor = LocalDaskExecutor(num_workers=1)
    
    
    flow1 = StartFlowRun("flow1", project_name='test', wait=True)
    flow2 = StartFlowRun("flow2", project_name='test', wait=True)
    
    with Flow("example", executor=executor) as flow:
    		
    	table_dict_param_list = []
    	with open('parameters.txt', 'r') as f:
    		lines = f.readlines()
    		for cnt, line in enumerate(lines):
    				dict_param = {}
    				dict_param['param1'] = cnt
    				dict_param['param2'] = line
    				table_dict_param_list.append(table_dict_param)
    	
    	flow1 = flow1()
    	
    	## i need this execution to be sequential like in a "for loop"	
    	flow2.map(parameters=table_dict_param_list)
    
    flow.register(project_name="test")
    m
    • 2
    • 7
  • n

    Nicholas Chammas

    04/01/2021, 5:17 PM
    Design question: Why isn’t
    idempotency_key=flow.serialized_hash()
    the default? https://docs.prefect.io/orchestration/concepts/flows.html#core-client I can’t think of why someone would want the Flow version to change if the Flow definition hasn’t.
    j
    m
    m
    • 4
    • 21
  • r

    Riley Hun

    04/01/2021, 5:58 PM
    Hello all, I'm migrating to the newly released versioned Prefect Server helm chart and deploying it on GCP, but I seem to be running into a couple deployment issues.
    prefect-server-towel
    seems to have an
    ErrImagePull
    error, and
    prefect-server-hasura
    and
    prefect-server-ui
    can't pull their respective images from the registry. Here's my deployment command I entered:
    helm repo add prefecthq <https://prefecthq.github.io/server/>
    helm install ${NAME} prefecthq/prefect-server --values=values.yaml
    m
    • 2
    • 9
  • l

    liren zhang

    04/01/2021, 10:28 PM
    Hi Experts, I created a flow of flows with startFlowRun. Dependencies have been set in the control flow. I have set wait=True in each of the startFlowRun(wait=True, *kwargs). Based on my understand the flow should keep going down to the dependent flows if the upstream flows has been executed successfully. I set the the test in a way that flow3 only depend on flow2 and flow2 only depend on flow1. I was think that flow2 will start execution when flow1 succeed; but it turns out that it did not get going until flow4 finish running. This is not what I was expecting. Can you see what I might have done wrong?
    m
    • 2
    • 4
  • e

    eli

    04/01/2021, 11:04 PM
    Hi all I'm trying to run a long running task that takes in a local DaskCluster but it seems like something on prefect arbitrarily kills off the worker... My flow uses the Dask ResourceManager looks something like
    @task(checkpoint=False)
    def long_running_dask_task(inputs: dict, client: Client) -> boolean:
      futures: List[Future] = []
    
      while(True): 
        next = get_next(inputs)
        if not next:
          break
        f = client.submit(func, next)
        futures.append(f)
        
      client.gather(futures)
      return True
      
    with Flow('local-dask-flow') as flow:
      with DaskCluster(...) as client:
        long_running_dask_task(param_1)
      flow.executor = LocalExecutor()
    https://docs.prefect.io/core/idioms/resource-manager.html#example-creating-a-temporary-dask-cluster
  • j

    Jonathan Chu

    04/02/2021, 12:34 AM
    hi guys, how do i control the image name that is used for docker storage? e.g. if i enter a flow name with underscores, it converts it to dashes to get a repo name https://docs.prefect.io/orchestration/flow_config/storage.html#docker
    c
    • 2
    • 3
  • c

    CA Lee

    04/02/2021, 1:21 AM
    Hello all, has anyone experienced issues with agent health? Long running agent processes have suddenly stopped polling Prefect Cloud
    n
    • 2
    • 7
  • m

    matta

    04/02/2021, 1:47 AM
    Anyone else had the problem of mapped tasks succeeding with every sub-task but staying in a
    mapped
    state?
    n
    c
    +3
    • 6
    • 35
  • j

    Jonathan Chu

    04/02/2021, 1:48 AM
    the
    RUN
    tab with
    Docker
    doesn't seem to parse the JSON version of the Environment variables correctly seems to keep the wrapping quotes as part of the value
    n
    m
    • 3
    • 6
  • j

    Jeremy Tee

    04/02/2021, 7:47 AM
    hi everybody, I am currently using
    prefect cloud
    , and my flow will invoke
    aws lambda
    and return me with the response. However, after registering my flow, whenever i try to run it, it throws
    Unexpected error: TypeError("cannot serialize '_io.BufferedReader' object")
    Is there a workaround on this?
    @task(name="invoke_lambda")
    def invoke_lambda(function_name, table_path, etl_target_date):
        lambda_client = boto3.client("lambda")
        response = lambda_client.invoke(
            FunctionName=function_name,
            Payload=json.dumps({"table_path": table_path, "etl_target_date": etl_target_date}),
        )
        return response
    
    
    with Flow(
        "test-flow",
        executor=LocalExecutor(),
        run_config=LocalRun(),
        storage=S3(
            bucket="random-bucket",
        ),
    ) as flow:
        x = invoke_lambda("test", "a/b/c", "2021/04/02")
    
    
    flow.register(project_name="xxx", labels=["dev"])
    c
    z
    +2
    • 5
    • 27
  • m

    Matthew Blau

    04/02/2021, 1:34 PM
    Hello all, I see that I can set up retry logic with
    @task(max_retries=3)
    but how can I set up the retry logic if I am not setting up tasks with the functional API? I do not see anything in the docs that explains. Thank you in advance!
    z
    k
    e
    • 4
    • 8
  • m

    Marwan Sarieddine

    04/02/2021, 4:13 PM
    Hi Folks, we are in the process of migrating to make use of a KubernetesRun config and a DaskExecutor, alongside our Kubernetes agent on EKS. We seem to be running into issues running our flows with a custom context from the prefect UI.
    👀 1
    n
    m
    • 3
    • 16
  • n

    Nikola Milushev

    04/02/2021, 4:34 PM
    Hi all, we have a flow which is infinitely running because a mapped task is stuck on "Starting task run...". The log is showing just that, nothing more. The task itself is decorated with
    @task(max_retries=6, retry_delay=timedelta(minutes=10), timeout=60)
    , however it seems the task run is stuck before it can trigger the retry. May be there another reason this is happening other than the OOM Killer, as seen in a similar topic from 15.03?
    n
    • 2
    • 3
  • j

    Jay Shah

    04/03/2021, 11:41 PM
    Hi, we are using SQL Server Task - SqlServerExecute to execute a truncate table query (also merge query). We are encountered this error. The documentation suggests that the data field can be optional - https://docs.prefect.io/api/latest/tasks/sql_server.html#sqlserverexecute we are able to execute SqlServerExecuteMany and SqlServerFetch. Can someone help?
    Unexpected error: TypeError('execute() takes no keyword arguments')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 454, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/tasks/sql_server/sql_server.py", line 90, in run
        executed = cursor.execute(query=query, vars=data)
    TypeError: execute() takes no keyword arguments
    k
    • 2
    • 7
  • t

    tash lai

    04/05/2021, 6:01 AM
    Hi all. Can anyone explain how does garbage collection works? like, will the result of
    produce
    stay in memory until the flow finishes, or will it be removed as soon as
    consume
    finishes?
    @task
    def produce(url):
        return download_big_json(url)
    
    @task
    def consume(big_json):
        do_something(big_json)
    
    with Flow('my_flow') as flow:
        urls = Parameter('urls')
        produced = produce.map(urls)
        consume.map(produced)
    k
    • 2
    • 6
  • v

    Varun Joshi

    04/05/2021, 8:20 AM
    Hey Prefecters, how to I upgrade from prefect version 0.14.9 to 0.14.10?
    k
    • 2
    • 1
  • j

    Jeremy Tee

    04/05/2021, 10:00 AM
    hi everybody, just wondering if its possible to search `task_run_id, flow_id`from the prefect UI?
    v
    k
    • 3
    • 2
  • r

    Rob Fowler

    04/05/2021, 12:16 PM
    is there a way to initialise a custom result handler in a prefect agent run flow? I can, of course, initialise a result handler at build time but now I am all fancy with common containers for test and production I don't want the redis/azure result destination to be hard coded in the docker storage.
    k
    m
    • 3
    • 48
  • h

    haf

    04/05/2021, 5:11 PM
    Hi there! Does anybody here work with Prefect together with Python notebooks and MLOps/Feature/Model stores?
    k
    a
    • 3
    • 33
  • b

    Brett Naul

    04/05/2021, 6:12 PM
    curious if anyone has any thoughts on how to avoid this kind of looping behavior when the dask worker is repeatedly killed after running out of memory...is this what version locking is for? I can't remember exactly how that works and it doesn't seem to really be documented anywhere
    k
    a
    • 3
    • 16
  • j

    Joseph Loss

    04/05/2021, 6:54 PM
    Hey guys, can someone please point me in the direction of importing user-created functions from other python files? We have a large library of common functions that are used in almost every script, I'm developing a use-case for using Prefect over VisualCron but I'm running into some issues here
    k
    • 2
    • 15
  • t

    Tomás Emilio Silva Ebensperger

    04/05/2021, 9:43 PM
    Any experience/advice creating flows dynamically in one script? I have several client configs in json files and one flow looping around those configs to execute the tasks according to each client. I was thinking of dynamically loop but have many flows created instead. any thoughts?
    d
    • 2
    • 7
  • z

    Zach Hodowanec

    04/05/2021, 9:54 PM
    Hey team, is it possible to use a Deploy Key instead of a PAT for authentication when using GitHub Storage?
    d
    k
    r
    • 4
    • 14
  • w

    Willian Chan

    04/05/2021, 9:59 PM
    Hello everyone, I'm stuck in an error when running my flow with helper scripts from  
    GitLab
      The UI tells me: 
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'mail_client'")
     . The main problem here is that the file mail_client.py is not present in the agent, and for me it is impracticable to send each auxiliary script to the agent (there is going to be a lot of flows) The structure of the repository:
    gitlab-repository/
    ├── flow.py
    └── mail_client.py
    Inside my flow.py it imports the mail_client:
    from mail_client import MailClient
    ...
    ...
    The configuration for 
    GitLab
     storage:
    flow.storage = GitLab(
        repo="XXXXX",
        host="XXXXX",
        path="flow.py",
        secrets=["GITLAB_ACCESS_TOKEN"]
    )
    I need the agent to be able to pull the entire repository because there will be many processes being inserted in the prefect and there is no way to change the agent with each modification in a process. does anyone have any solution for this? Thanks
    k
    • 2
    • 16
  • j

    Jonathan Chu

    04/06/2021, 12:40 AM
    how are flow configurations supposed to be used? https://docs.prefect.io/orchestration/agents/docker.html#flow-configuration these labels and env variables are presumably specific to each agent that i start up, so i can control, say, staging and prod but this looks like something global that's checked in to the codebase that goes with the flow code definition
    c
    • 2
    • 7
  • m

    matta

    04/06/2021, 1:51 AM
    Hrm, so if I run the
    DaskExecutor
    in a Jupyrer notebook and I'm using threads (
    cluster_kwargs={"processes": False}
    ) then I see the logs in the notebook. If I take that off though, then the logs disappear. How do I have the logs go to the notebook again?
    k
    • 2
    • 3
  • w

    Wolfgang Steitz

    04/06/2021, 9:36 AM
    Hey! I've task that is scheduled to run every 4 hours. However sometimes the associated agent isn't running for let's say a day. In that case the pending runs obviously pile up. I'd like to avoid that, especially because there is no point running that many runs the next day. I can imagine 3 ways to implement such behavour, unfortunately I didn't find anything in the docs so far: 1. set a limit of pending runs of a given fllow 2. timeout a pending run after some time 3. add a task to the flow that checks the scheduled time and skips the actual task if above some threshold I assume 1 and 2 are not available. 3 should be possible somehow. Any pointers on how to implement this?
    ✅ 1
    c
    s
    • 3
    • 3
  • l

    Levi Leal

    04/06/2021, 11:04 AM
    I'm creating an integration with datadog and I need to add a custom handler to prefect's flows. I need to add a handler that gets all logs from the run and spits it out as json. I've seen a lot of examples like the one bellow, but that's not what I need. I don't want to add a handler to each logger.
    logger = prefect.context.get('logger')
    logger.addHandler(log_handler)
    I need something like this
    log_handler = logging.StreamHandler()
    log_handler.setFormatter(DatadogFormatter())
    get_logger().addHandler(log_handler)
    I add the handler to the 'root' logger and everything is logged the way I need. I've tried the latter and it works fine with
    flow.run()
    , but when I register the flow I can't get it to work with k8s. More details in the thread
    m
    k
    +3
    • 6
    • 17
  • h

    haf

    04/06/2021, 1:07 PM
    Hi, I'm back at https://prefect-community.slack.com/archives/CL09KU1K7/p1615401726121200?thread_ts=1615327031.065400&amp;cid=CL09KU1K7 trying to make it work; has job templating (being able to add a single annotation) started working?
    k
    t
    • 3
    • 15
Powered by Linen
Title
h

haf

04/06/2021, 1:07 PM
Hi, I'm back at https://prefect-community.slack.com/archives/CL09KU1K7/p1615401726121200?thread_ts=1615327031.065400&amp;cid=CL09KU1K7 trying to make it work; has job templating (being able to add a single annotation) started working?
Is there a way to supply a job template that works now?
Ref docs https://docs.prefect.io/orchestration/agents/kubernetes.html#running-in-cluster
I created this bug report https://github.com/PrefectHQ/prefect/issues/4359
k

Kevin Kho

04/06/2021, 2:19 PM
Hey @haf, I will look into this when I get the chance and probably respond to you on the Github issue.
🙌 1
h

haf

04/06/2021, 2:20 PM
Thanks @Kevin Kho
t

Tyler Wanner

04/06/2021, 5:20 PM
Hi @haf. I've provided an update in your bug ticket. Please look into it and let me know if I'm understanding properly
👍 1
h

haf

04/06/2021, 5:25 PM
@Tyler Wanner That is good progress towards a repro. I have this issue as we speak but I’m doing some exercise before I get back tonight.
I’ll improve the issue then with some findings.
Thank you for looking into it.
t

Tyler Wanner

04/06/2021, 5:26 PM
sure thing--i'll actually add the things i used in my attempt at repro--i can tell you that i had multiple issues getting it working, it was not a great experience and I'll definitely be looking to proper it up
h

haf

04/06/2021, 5:26 PM
Yeah I never got it working before. Maybe I will now :)
🚀 1
t

Tyler Wanner

04/06/2021, 5:27 PM
do let me know
also, i noticed you mounted to
/usr/local/lib/python3.8/site-packages/prefect/agent/kubernetes/job_template.yaml
that's the install location for python3.8 but in the prefect image, by default it runs 3.7. if you mount it to
/usr/local/lib/python3.7/site-packages/prefect/agent/kubernetes/job_template.yaml
then you don't have to actually change the args at all (this is equivalent to overwriting the default file which, as you said, works)
h

haf

04/19/2021, 4:57 PM
I solved it (in a rather hacky way!)
t

Tyler Wanner

04/19/2021, 10:21 PM
glad to hear it! responded in the issue because probably you won’t be the only person to try to do something like this, and it’s more searchable there. I’ll try to think on how the experience can be improved but generally your run config is going to be the better place to set those annotations and args
View count: 2