https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Severin Ryberg [sevberg]

    12/18/2020, 9:35 PM
    I am having a reoccurring issue where jobs I submit with Prefect (using a DaskKubernetesEnvironment and a kubernetes cluster on amazon EKS). In the screen shot below you can see that I've mapped over 3000 tasks within my flow, but only a small fraction of them (in this case 173) actually run. Note, the tasks labeled as 'running' in the pictures are actually already canceled, this just isn't reflected in the cloud UI.
    m
    • 2
    • 10
  • m

    matta

    12/19/2020, 5:21 AM
    What would be the "prefect-ic" way to make a little library with utility functions that multiple flows call?
    👀 1
    k
    • 2
    • 2
  • v

    Vitaly Shulgin

    12/19/2020, 9:47 AM
    Hello Team, I built base image, configured flow to use it as base, specified local environment, but agent tries to pull image from docker.io, any ideas? How to prevent it to try pull image from docker.io?
    ✅ 1
    • 1
    • 9
  • v

    Vitaly Shulgin

    12/19/2020, 1:41 PM
    Hello team, KubernetesEnvironment fails to submit job with the following error
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job.batch \"\" is invalid: [metadata.name: Required value: name or generateName is required, spec.template.spec.containers[0].name: Required value, spec.template.spec.restartPolicy: Required value: valid values: \"OnFailure\", \"Never\"]","reason":"Invalid","details":{"group":"batch","kind":"Job","causes":[{"reason":"FieldValueRequired","message":"Required value: name or generateName is required","field":"metadata.name"},{"reason":"FieldValueRequired","message":"Required value","field":"spec.template.spec.containers[0].name"},{"reason":"FieldValueRequired","message":"Required value: valid values: \"OnFailure\", \"Never\"","field":"spec.template.spec.restartPolicy"}]},"code":422}
    ✅ 1
    • 1
    • 5
  • s

    simone

    12/19/2020, 2:18 PM
    Hi ! I am running the following simple testing flow:
    import prefect
    from prefect import task, Flow, Parameter, flatten, unmapped
    from prefect.executors import DaskExecutor
    from prefect.run_configs import LocalRun
    
    @task
    def test_parallel(a):
        time.sleep(20)
    
    with Flow("filtering-counting",run_config=LocalRun(), executor = DaskExecutor(address='<tcp://193.10.16.58:3111>')) as flow:
       all_data = Parameter('all_data',default = list(range(10)))
       mapped = test_parallel.map(all_data) 
    flow.register(project_name="test")
    I have updated to the new version of prefect and I get
    prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
    In the setup with the previous version of prefect I was not getting the error. I still get the error even if
    flow.set_reference_tasks([mapped])
    I have being going through the docs but I still cannot understand where the error is coming from. Thanks!
    c
    • 2
    • 4
  • p

    Pedro Machado

    12/21/2020, 5:37 AM
    Hi. Question about
    ShellTask
    . I am running a flow using the docker agent and I am passing some env vars to the agent with the
    -e var=value
    option. I suppose these variables available to the shell task. Am I correct? or do I need to pass read them from the environment and pass them explicitly to the task?
    k
    • 2
    • 1
  • s

    Sagun Garg

    12/21/2020, 9:24 AM
    Hi I am struggling with error of my pods getting killed "*Warning Unhealthy kubelet  Liveness probe failed :8080/api/health" on AWS EKS Fargate* Refer: https://prefect-community.slack.com/archives/C0192RWGJQH/p1608535632114500
    👀 1
  • j

    Jonas Hanfland

    12/21/2020, 10:59 AM
    Hey guys, one of my tasks is consistently getting stuck in
    running
    , causing it to run for days, but locally it seems to run fine. The logs are empty. It's a
    BigQueryTask
    which writes the result to a table. What would be the best way to figure out what's going wrong?
    a
    k
    • 3
    • 2
  • m

    Mary Clair Thompson

    12/21/2020, 1:28 PM
    You can bring up a local agent using the command 'prefect agent local start', but there doesn't seem to be a similar command for tearing down agents. What's the correct way to bring down an agent?
    k
    • 2
    • 3
  • k

    Kevin Weiler

    12/21/2020, 5:45 PM
    Is there any way to change executors in the middle of a flow? Use-case in thread -->
    k
    • 2
    • 6
  • l

    liren zhang

    12/21/2020, 6:38 PM
    Hi Prefect experts, I am trying to run the following graphql query via prefect.client. However, it does not like the logs{ message} entry. I was able to run the same graphql query in interactive API. Can anyone point out what I am doing wrong?
    def get_task_run(flow_run_id):
        task_run_query = """
            query {
            task_run ( where:{flow_run_id:{_eq: "%s"}}){
                id,
                name,
                start_time,
                end_time,
                state,
                state_message,
                created,
                flow_run_id,
                heartbeat,
                logs {message
                }
                state_result,
                state_start_time,
                state_timestamp,
                task_id,
                tenant_id,
                updated,
                version
            }
            }
        """ % flow_run_id
                # logs { 
                # id,
                # level,
                # message  
                # },
        task_run_results = json.loads(Client().graphql(task_run_query).to_json())
        for task_run in task_run_results['data']['task_run']:
            task_json = json.dumps(task_run).replace('\\', '\\\\').replace("'", "\\'")
            print('liren again.............')
            save_to_file(task_json,'task_run.log')
    k
    • 2
    • 10
  • m

    mithalee mohapatra

    12/22/2020, 7:31 AM
    I have a question related to the Schedules and Prefect UI Scheduler. Can I use the Prefect Schedule to schedule a task in the prefect UI: I want to schedule the below code in the Prefect UI. from prefect import task, Flow from datetime import timedelta #from prefect.schedules import IntervalSchedule from prefect.schedules import Schedule from prefect.schedules.clocks import IntervalClock import pendulum from prefect.engine.executors import LocalDaskExecutor from prefect.engine.executors import DaskExecutor import prefect import boto3 @task def say_hello(): print("Hello, world!") logger = prefect.context.get("logger") logger.info("Hello, world!") try: s3 = boto3.resource('s3') except Exception as e: raise signals.FAIL(e) if name == '__main__': schedule = Schedule(clocks=[IntervalClock(start_date=pendulum.datetime(2020, 12, 21,hour=6,minute=3,second=0),interval=timedelta(minutes=2))]) with Flow("Hello") as flow: say_hello() executor=LocalDaskExecutor(scheduler="processes", num_workers=6) flow.run(executor=executor) flow.register(project_name="Hello, World!")
    j
    • 2
    • 9
  • j

    Joël Luijmes

    12/22/2020, 8:25 AM
    I’m running prefect in kubernetes with preemptible nodes (maybe not the best idea.. 🤫 ). Regardless I just noticed some undesired behavior. At some point the agent fails to poll the graphql service, but then it just hangs. Agreed, the agent should be able to access the backend, and admittedly my setup might not be optimal, but I don’t think hanging is the right behavior when it cant access the backend. What is your idea on this? And, can we improve this? Possible suggestions • Increase the retry limit (to unlimited) and the backoff_factor • Maybe more specific for the KubernetesAgent, but adding health checks seems like a good idea nevertheless. This allows kubernetes to monitor the agent and restart the agent when it does hang.
    j
    d
    • 3
    • 19
  • s

    Sagun Garg

    12/22/2020, 11:01 AM
    Hey there, Is there storage support for AWS ECR ? (To store our docker image directly ECR ) https://docs.prefect.io/orchestration/flow_config/storage.html I checked this but it dosen't explicitly state it How do I mention the Auth credentials for AWS ECR while using this ? Is it it automatically picks from my local aws configured credentials to hit the same. ? Where do we specify login credentials for the same ?
    m
    • 2
    • 2
  • a

    Ajith Kumara Beragala Acharige Lal

    12/22/2020, 4:04 PM
    Hi Prefect experts, I want to try out Prefect in our existing ETL pipeline for scheduling/Spark job management - I know Prefect is a best match for Python based scripts ( eg: Pyspark) , would it support Spark Scala/Java jobs as well ? because our ETL is mainly built with Scala Spark jobs? any examples or document related to this matter? Thank you in advance! 🙏 ( sorry, if this is a duplicate question)
    j
    • 2
    • 4
  • k

    Krzysztof Nawara

    12/22/2020, 6:09 PM
    Hello! If prefect pipeline fails partially (when using Pipeline.run()) is it possible to retrieve _State_s and results for the tasks that executed successfully?
    j
    c
    • 3
    • 3
  • s

    Sean Talia

    12/22/2020, 6:49 PM
    does anyone know if there's a way from the CLI to get a list of agents that you have running against your cloud instance?
    • 1
    • 1
  • p

    Pedro Martins

    12/22/2020, 7:24 PM
    Hey! Me again 🙂 I'm trying to run the aircraft example in a Kubernetes Agent. I created an image in which the
    aircraftlib
    comes already installed but when prefect jobs start running it can't find the package.
    [2020-12-22 18:10:45+0000] INFO - prefect.S3 | Downloading aircraft-etl/2020-12-22t18-10-34-141798-00-00 from dr-prefect
    No module named 'aircraftlib'
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.6/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.6/site-packages/prefect/cli/execute.py", line 90, in flow_run
        raise exc
      File "/usr/local/lib/python3.6/site-packages/prefect/cli/execute.py", line 67, in flow_run
        flow = storage.get_flow(storage.flows[flow_data.name])
      File "/usr/local/lib/python3.6/site-packages/prefect/storage/s3.py", line 115, in get_flow
        return cloudpickle.loads(output)
      File "/usr/local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 562, in subimport
        __import__(name)
    ModuleNotFoundError: No module named 'aircraftlib'
    Any clue how can I make it work?
    j
    • 2
    • 31
  • i

    itay livni

    12/22/2020, 10:49 PM
    @nicholas @Aiden Price I am also running into this error. Not using
    apply_map
    the graph is fine. The flow runs but is incorrect in the sense that the
    case
    statement is not mapped over. With
    apply_map
    the execution graph is messy with lots of loops and just plain wrong.
    n
    a
    • 3
    • 12
  • p

    Pedro Machado

    12/23/2020, 4:49 AM
    Hi. I can't see the dbt output on Prefect Server while
    DbtShellTask
    is running. I set the agent default logging to DEBUG using the cli switch and I set Prefect Server's logging level via the env var
    PREFECT__LOGGING__LEVEL=DEBUG
    I also tried passing these arguments to
    DbtShellTask
    return_all=True,
    log_stdout=True,
    log_stderr=True,
    Any ideas?
  • a

    Amanda Wee

    12/23/2020, 11:41 AM
    If a local agent is running connected to a remote prefect server, and prefect ui shows the agent is indeed active, what could be the reason for a late flow when using "quick run"?
    p
    m
    • 3
    • 4
  • n

    Nabeel

    12/23/2020, 1:00 PM
    Hi everyone. I have a question and was wondering if anyone has advice on the best way to approach programmatically starting a prefect agent when we want to execute a job vs having the agent permanently running?... Thanks so much 🙌
    👍 1
    m
    • 2
    • 2
  • g

    Ganesh Kalyansundaram

    12/23/2020, 1:05 PM
    Hey guys! Let me describe my use case to you. I have a Paddle Paddle and Tensorflow 2.0 model which inferences over a video stream. I'm using a Local Dask Execution Environment to run these two models in parallel. After the flow quits, the Dask Worker still seems to be holding the memory and not releasing it for some reason. In order to get the memory to release, I have to kill the Dask worker (which I'm doing with a keyboard interrupt now). So I decided to shift the Prefect Local Agent, Dask Scheduler and Dask Worker into Monit (which is a Unix util to monitor processes). 1. I'm not able to get Monit to work with any of these services. Does anyone have experience? 2. Would it be sensible to have the final node in the DAG look at the Dask worker's PID and use pkill to kill it? That way before the next flow run starts, Monit can recover and restart the Dask worker. Would greatly appreciate if someone who's worked on something similar can help since I'm kinda clueless now. 😫
    m
    • 2
    • 5
  • r

    Robert Sokolowski

    12/23/2020, 2:23 PM
    Hey folks, does anyone have guidance for dealing with agents that may go down? One idea I have is to write a daemon (or similar) that checks a config file which says "We should have X agents of label XX, Y agents of label YY, etc...". I'm not sure if there's anything more "Prefect-y" to deal with this
    ➕ 1
    m
    g
    • 3
    • 3
  • k

    Kyle Flanagan

    12/23/2020, 7:52 PM
    New to prefect -- seems very cool. Would like to be able to store the exception when a task fails due to an unexpected exception. Unfortunately, the results are not getting stored when using LocalResult. Here's a MWE.
    import os
    os.environ['PREFECT__FLOWS__CHECKPOINtING'] = 'true'
    
    from prefect import task, Flow
    from prefect.engine.results import LocalResult
    
    @task(result=LocalResult(dir='/tmp/prefect-res'))
    def my_task():
        print("in task")
        raise RuntimeError("oh no!")
    
    with Flow('MyFlow') as flow:
        my_task()
        state = flow.run()
        print("Location is: >", state.result[flow.get_tasks()[0]]._result.location, "<")
    Result is:
    Location is: > None <
    If I
    return
    some value from
    my_task()
    the result gets stored, but exceptions do not.
    c
    • 2
    • 16
  • b

    Brett Naul

    12/23/2020, 8:52 PM
    very quick q: is flow version group actually deprecated like https://docs.prefect.io/orchestration/concepts/flows.html#versioning says? I don't see any references in the code to this deprecation. my use case is: I'd like to split out the "staging" flow runs (which run many times per day but are not very important) from the "prod" flow runs in the UI somehow, and
    version_group_id
    seems like the most appropriate way to do that...but I don't want to rely on it if it might go away
    c
    n
    • 3
    • 8
  • j

    jack

    12/23/2020, 9:39 PM
    Hey all - I recently got into a bit of issues where my newly registered flows would not run (even a simple hello world flow that works on local server/agent) and fail with a different error message for each of the prefect version: 0.13.17 =
    Failed to load Flow from C:\Users\JackChoi\.prefect\flows\hello-flow-jack-v2.prefect
    Traceback (most recent call last):
    File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/local.py", line 105, in get_flow
    return extract_flow_from_module(module_str=flow_location)
    File "/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py", line 119, in extract_flow_from_module
    module = importlib.import_module(module_name)
    File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
    File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
    File "<frozen importlib._bootstrap>", line 991, in _find_and_load
    File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
    ModuleNotFoundError: No module named 'C'
    0.14.0 =
    {'_schema': 'Invalid data type: None'}
    Does anyone know what's happening here and how I could fix it? Cheers!
    m
    • 2
    • 4
  • m

    mithalee mohapatra

    12/24/2020, 1:41 AM
    Has anyone faced issue in mapping few tasks which uses SFTP connection and LocalDaskExecutor(). I see that the process hangs up when hits the mapped tasks. The mapped task works fine with a Dask cluster and just flow.run. The same workflow works fine process works for an FTP connection though.
    d
    k
    b
    • 4
    • 26
  • a

    abhilash.kr

    12/24/2020, 8:30 AM
    Hello All, Excited to share - Prefect Duck has arrived at our home!! Thanks to Prefect
    :marvin-duck: 4
    :marvin: 5
  • a

    ale

    12/24/2020, 9:07 AM
    Hey Prefect Team, how do I get some of your cool swag?! 🙂
    d
    j
    f
    • 4
    • 5
Powered by Linen
Title
a

ale

12/24/2020, 9:07 AM
Hey Prefect Team, how do I get some of your cool swag?! 🙂
d

Dylan

12/24/2020, 3:25 PM
We’ll be in touch 😄
j

jspeis

12/24/2020, 4:30 PM
I wouldn’t mind some either 😁
d

Dylan

12/24/2020, 5:54 PM
@jspeis You betcha 💯
🙌 1
f

faris ALsaleem

12/28/2020, 8:46 AM
@Dylan are you guys still giving out swag?
d

Dylan

12/28/2020, 6:06 PM
@faris ALsaleem you betcha
View count: 1