https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • h

    Hugo Polloli

    09/14/2021, 7:35 AM
    Hi, I'm building a state handler and I'm wondering if it's possible to attach information myself to the task, or state that will be available inside of the handler. Ideally being able to do something like this
    @task(state_handlers=[my_handler])
    def my_task(my_input):
        self.message = f"Input {my_input} wasn't ok"
        assert input_is_ok(my_input)
    
        self.message = f"Failed to do_some_things for {my_input}"
        res = do_some_things(my_input)
    
        self.message = f"Failed to do_a_few_more for {my_input}"
        res2 = do_a_few_more(res)
    
        return res2
    I understand I could use signals, using the value property of a signal to store what I want to print, but that would mean wrapping each row I'm performing an action in inside a try/except block, then raising the signal inside the except block, I've done that and it gets really cluttered I maybe missed something obvious and am going in the wrong direction though...
    k
    2 replies · 2 participants
  • i

    Issam Assafi

    09/14/2021, 7:43 AM
    Hello, What if in the same flow, i need multiple versions of Python for each task, or let's say different versions of some packages for each task... is this something Prefect can handle and how?
    k
    1 reply · 2 participants
  • s

    shekhar koirala

    09/14/2021, 8:01 AM
    my code is simple as this
    @task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
    def fetch_data(count,check):
        data = []
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(data)
        return data
        
    
    def run():
        with Flow("fetch-data") as local_flow:
            count = Parameter("count", default=100)
            check = Parameter("limit", default=10)
            fetch_data(count,check)
    
        local_flow.register(project_name="test")
        id = local_flow.run()
        print(id)
            
    if __name__ == "__main__":
        run()
    k
    2 replies · 2 participants
  • l

    Lucas Beck

    09/14/2021, 8:20 AM
    Hi Everyone, My colleagues and I recently started migrating our internal tool to Prefect, in order to see if it could support our use cases. So far we have had a generally good experience, specially with the simpler use cases. Recently though, we reached a few walls that have been hard to overcome. We have tried to go through the documentation and search this slack channel without much luck, so that is why I am now writing here to see if we can get some help. Let me quickly describe our overall setup followed by the challenges/blockers to see if someone has any suggestions. Please do share your thoughts also in the way organize things, perhaps a different way of organization could help. Our setup and use cases We have various different
    ETL
    like jobs to run, some on schedule and some on demand. We usually break down big computing tasks in smaller parts, each contained in its own
    docker
    container. These tasks, when combined, form a computation DAG. This DAG can mostly be parallelized (mapped). We then leverage an Azure Kubernetes Service (AKS) cluster to horizontally scale the compute jobs into hundrends, sometimes thousands of containers. So far we have been using our own internal tool to orchestrate tasks and manage dependencies, and that is what we are currently investigating if we can replace with prefect. Our prefect setup has a self hosted server instead being the cloud solution. Challenge 1 The first and most important challenge we are facing regards scaling jobs. I will use the flow we tried to migrate to prefect as the example here. I am attaching the DAG schematic as a picture. In this flow, each task use prefect's
    RunNamespacedJob
    to spin up a new job that has exactly one pod, which will perform a given computation. We then use other functions from the prefect SDK to read the logs after completion and delete the kubernetes jobs. The whole DAG can be run in parallel (mapped) for multiple inputs. Usually the job works fine for 3-5 inputs, but as soon as we try to scale to as little as 30 inputs we start seeing a ton of heartbeat errors or no pod statuses (also attaching an image here) . Once running with our internal tool, we already scaled up to around 500 inputs in parallel in the same cluster setup. Anyone else has experienced this? By the way, we are using the
    DaskExecutor
    for that with default parameters Challenge 2 We currently have a single repo where we want to keep all prefect flows under version control. In order to reuse code, we also made it a package that can be imported around. The structure looks like this:
    prefect-flows/
      setup.py
      requirements.txt
      prefectflows/
        flows/
          flow1.py
          flow2.py
          ...
        core/
          kubernetes.py
          ...
    The idea is that the actual flows sit under the
    flow
    folder and can import boiler plate/ utils from the
    core
    folder. For instance, under
    kubernetes.py
    are all functions that use prefect's SDK to interact with kubernetes. An example function is
    run_job
    which will run a job, read logs and eventually delete that job. This all works fine if running locally, but fails once registering a flow and trying to run it through the server. The error is :
    Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'prefectflows.core\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    It is important to mention that I have dockerized the package using the prefect base image, and I am telling prefect to run the flow with that image. Something like the following:
    Flow(
            flow_name,
            storage=Azure(
                container=container,
                stored_as_script=False,
                connection_string=azure_connection_string,
            ),
            run_config=KubernetesRun(
                image="imagewithourpackageinstalled:latest",
    ...
    Maybe I misunderstood where the unpickling is happening, but would expect that if the flow uses an image that has all packages used when registering that flow, then that should work. I think that summarizes it, but please let us know if you need further description or to share more of the code. Thanks in advance for your help!
    d
    k
    20 replies · 3 participants
  • n

    Nadav Nuni

    09/14/2021, 8:21 AM
    hey people, I’m having some issued with a specific use case: I have a large amount (e.g 10000) of objects which I want to perform am operation on, in separate tasks each has overhead of loading external data, starting a pod, etc… so…. I want to perform this operation in chunks (e.g, 100 objects per task). The problem is that passing a mapped array of arrays doesn’t really work for me, and I get this (attached) is there a way to mark the inner lists as
    unmapped
    ? (code added in a reply)
    k
    3 replies · 2 participants
  • m

    Marko Herkaliuk

    09/14/2021, 8:50 AM
    Hello everybody. I have a Flow that create_flow_run (a few) and waits until the finish - wait_for_flow_run. But if one of the sub-flow is Failed parent flow has a Success state anyway. Can I make parent flow Failed if one of the children fails?
    k
    n
    +1
    20 replies · 4 participants
  • m

    Maikel Penz

    09/14/2021, 9:16 AM
    Hey ! I’d like to share an issue I’m facing with Prefect that someone that uses Kubernetes might be able to help. I’m registering workflows using Docker storage and the registration + build of the image happens through a CI/CD pipeline that runs on Kubernetes (AWS EKS). The Environment: As you saw above, my CI/CD environment is containerised (a pod on Kubernetes). This means that when registering a flow the prefect register API is building a container inside a container. The issue: The Prefect register command (which builds the image) fails to pull libraries from PyPi throwing connectivity issues (DNS resolution). The Kubernetes pod has internet access + DNS working as I can pull the prefect Library from PyPi for example. But for some reason the registration command where the image is built throws this:
    WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.HTTPSConnection object at 0x7ffb221c2160>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution')': /simple/pip/
    see the
    Temporary failure in name resolution
    In summary: I’m not sure how much this is Kubernetes or a Prefect issue. I’m throwing it here as there could be a tweak on Prefect we could do to make it work. Any ideas ?
    j
    k
    +1
    9 replies · 4 participants
  • a

    Abhishek

    09/14/2021, 12:02 PM
    Hello folks, i have a question reg Prefect cloud flow/task execution and IP range of cloud. Here is what my current flow does: • single flow having multiple tasks to sync few S3 buckets. (each task runs s3.sync command via ShellTask ) • catch 22: the source S3 bucket is from different account (lets say X) and destination is bucket is in different account (lets say Y). The AWS policies are already in place to do cross account s3 sync. (the IAM role/user are already added in policy for access) Issue: • I need an IP address/IP range of network where this S3 task execution happens so that we can whitelist the IP range of this at source bucket account (X) policy. I am running my task on Prefect cloud - which means the request to execute sync command goes from prefect cloud network. Is there any way i can get the IP range of prefect cloud where this execution happens ? I need to whitelist the IP range. In case if i run prefect server on my infra then it becomes easy for me to whitelist the IP range of an AWS VPC under which the server instance runs. But in prefect cloud i am not having clarity. cc: @Kevin Kho
    ✅ 1
    n
    k
    10 replies · 3 participants
  • c

    Chris McPherson

    09/14/2021, 1:53 PM
    Hello Prefect community! I'm working on a machine learning team that hopes to use Prefect to manage ML feature pipelines and model training/ops with Prefect. We're just getting started. If there are any other data science or ml practitioners further in their journey with Prefect, we'd love some advice or "I wish I had known/started with"s 👍
    k
    2 replies · 2 participants
  • c

    Constantino Schillebeeckx

    09/14/2021, 4:45 PM
    Could I get some suggestions on how to debug a flow which has been submitted for execution, but doesn't actually start? I've confirmed: • the agent (ECS) is querying for runs • the flow is tagged properly for the agent to pick it up • cloudwatch tells me the agent properly picks up the flow
    k
    3 replies · 2 participants
  • k

    Kathryn Klarich

    09/14/2021, 5:41 PM
    is there anyway to set the
    max_retries
    on a task from the UI (or without having to upload new code)? I was hoping I could template the value and pass it to the @task decorator, but it doesn't look like you can use templates with the
    max_retries
    argument
    k
    8 replies · 2 participants
  • v

    Vasudevan Balakrishnan

    09/14/2021, 6:10 PM
    I'm new to Prefect. I've a scenario and I cant seem to find a way in prefect to do it. I've 3 flows - A, B, C all running on their own schedules. Flow B and C have an indirect dependency on A (the latest run of A should be completed and not failed). I was trying to check the status of the latest flow run using FlowRunView (which takes in flow_run_id as parameter) inside Flow B and C. Is there an api to get the latest flow_run_id for a flow (using its name)? Is there any other workaround?
    k
    3 replies · 2 participants
  • j

    Joe Schmid

    09/14/2021, 6:35 PM
    We're a bit behind on Prefect versions (0.13.19) because we've been using Environments (especially DaskCloudProviderEnvironment) so we're ramping up on Run Configs. With Run Configs can we: 1. Examine the parameters of a Flow run at the start of execution, calculate the number of Dask workers based on a parameter value, and create an ephemeral Dask cluster to run that Flow on? 2. Alternatively, associate a Run Config with a schedule, e.g. Run this Flow at midnight with 12 workers and run it again at 4am with 20 workers, etc.
    k
    g
    8 replies · 3 participants
  • l

    Lucas Hosoya

    09/14/2021, 6:56 PM
    Hi, I'm trying to run a Pyspark script inside my Flow, but it gives me this error: I'm using Local Agent, already tried changing environment paths, but still gives me the same error after I register -> "Quick Run" Does anybody know how to deal with pyspark scripts(not Databricks) with Prefect?
    Exception: Java gateway process exited before sending its port number
    k
    z
    +1
    42 replies · 4 participants
  • s

    Sean Talia

    09/14/2021, 9:34 PM
    Is there a canonical way or a canonical task that folks use for writing arbitrary tasks to flow logs? Like if I were to have a (built-in) task whose output I'd like to log somewhere, do people generally just write a trivial task that looks something like:
    from prefect.utilities.logging import get_logger
    from prefect import task
    
    @task
    def log_value(val: str) -> None:
        logger = get_logger()
        <http://logger.info|logger.info>(val)
    k
    k
    8 replies · 3 participants
  • a

    Abhas P

    09/14/2021, 10:28 PM
    Hi all, for a prefect flow to pass on credentials to the mapped tasks - can I pass a dictionary of credentials or should be split into singular values ?
    with Flow("example") as flow:
    
      credentials = {'id': ... , 'pass': ...}
      apply_map(transform ,result ,unmapped(credentials)) # can we do this ?
      apply_map(transform ,result ,id =unmapped(credentials['id']), pass =unmapped(credentials['pass'])) # or is this the standard way?
    k
    7 replies · 2 participants
  • k

    Kathryn Klarich

    09/14/2021, 10:55 PM
    I have an docker image with a specific version of prefect installed that I am using as the base image in my Storage object, however, when i go to registered the flow, a different version of prefect gets installed (i.e. the one I have installed locally where i'm calling flow.register() from). Is there anyway to avoid this? I know you can specify the prefect version when you create the Storage object, but then it is up to the user creating the flow to know what version to specify and i would prefer to just use the version that is already installed on the base image (as this image can be shared across users / flows)
    k
    16 replies · 2 participants
  • b

    Bastian Röhrig

    09/15/2021, 9:35 AM
    Hey everyone, I am having a small issue with dynamically named runs and automations in prefect cloud: I have a flow with a
    state_handler
    that uses the
    set_flow_run_name
    method on the prefect client to rename itself when it moves to the running state. Now, I have set automations in prefect cloud that alert me when a flow run fails. I am now experiencing the following behaviour: If a flow run fails within a few minutes (approx. 5-10 minutes), I see the prefect generated run name (e.g. neat-kudu) in the alerts. If the flow run fails after a longer time (50 minutes), I see my custom flow run name in the alert. Is this expected? Is there anything I can do on my site to fix it?
    k
    5 replies · 2 participants
  • j

    Jérémy Trudel

    09/15/2021, 4:00 PM
    Hi! I'm having an issue on Prefect Cloud where a flow appears as failed, despite no tasks having returned an error. The weirdest thing is the flow lasted 2 minutes, but one task is still marked as ongoing (blue) with its timer going up. What I don't understand is if there is still an active task, wouldn't the flow also be active past 2 minutes?
    k
    d
    3 replies · 3 participants
  • q

    Qin XIA

    09/15/2021, 4:34 PM
    I use Bitbucket Storage for my flow. Does anyone knows the local agent will download all file .py (my own function lib) or only the file defined this flow, when i run flow by local agent. thx
    k
    3 replies · 2 participants
  • l

    Luca Schneider

    09/15/2021, 6:34 PM
    Hi there, I’m new to prefect and GraphQL. I’m trying to trigger a flow based on the flow_name. Is there a way to chain the following GraphQL queries or do i need to send the requests consecutively ? Thanks
    query GetLatest {
      project(where: {name: {_eq: "project-test"}}) {
        flows(
          where: {name: {_eq: "flow-test"}, archived: {_eq: false}}
          order_by: {version: desc}
          limit: 1
        ) {
          id
        }
      }
    }
    
    mutation TriggerLatest {
      create_flow_run(input: {flow_id: "xxx"}) {
        id
      }
    }
    k
    9 replies · 2 participants
  • d

    Daniel Manson

    09/15/2021, 8:01 PM
    hi - this is quite a basic question i think, but I haven't found a clear answer so far... Is the recommendation to run Prefect flows against a shared filesystem or is the only persistence between tasks supposed to be explicit use of S3 etc. ? Some hints that a shared filesystem is expected are the filesystem tasks and the multiple mentions of EFS in the (deprecated) Fargate Agent, plus I see that according to AWS themselves, ECS (which Prefect now uses instead of raw Fargate), is intended to have EFS plugged in. However there are a few mentions of files not being persisted between tasks, including this Dask page. In general it seems that a lot of tasks will be a bit painful/impossible to use if you don't have a shared filesystem. Unless you want to pass large files through Prefect Task return values. Thanks!
    k
    8 replies · 2 participants
  • a

    Abhas P

    09/15/2021, 9:25 PM
    Hi there, I am trying to run a set of 3 dependent flows meant to be run sequentially (A->B->C). Is there a way I can have both a schedule for this flow (as stated here) and also have a manual trigger to the said parent flow ? What options do I have to manually trigger the 'parent-flow' apart from prefectUI and GraphQL ?
    k
    1 reply · 2 participants
  • n

    Nadav

    09/15/2021, 9:45 PM
    Hi, is there a way to use a dictionary returned from a task in the with Flow block? i read in some threads that it could be done using an intermediate task to unpack it, any examples for such task?
    k
    6 replies · 2 participants
  • j

    Jason Kim

    09/15/2021, 11:07 PM
    In order to connect an on-prem hadoop cluster to prefect cloud, I tried to add the prefect API host to the file /etc/hosts. ## prefect API 34.96.72.220 api.prefect.io PING appears to work, while telnet does not. PING api.prefect.io (34.96.72.220) 56(84) bytes of data. telnet api.prefect.io Trying 34.96.72.220... telnet: connect to address 34.96.72.220: Connection timed out Can anyone provide any tips on how to allow communication between the hadoop node and api.prefect.io?
    z
    1 reply · 2 participants
  • j

    Jacob Blanco

    09/16/2021, 12:30 AM
    Hi, I was wondering if anyone has any great ideas for how to deal with cancellation of long running queries started from a Prefect task. In my case I’m dealing with PostgreSQL and while psycopg2 will try to gently stop a query when you close a connection, in many cases the reason we are cancelling a flow/task is because that query has been running for too long and is unlikely to finish naturally in an acceptable timeframe. Instead of having to manually kill the query with
    pg_terminate_backend
    it would be nice if the task getting cancelled would be able to call
    pg_terminate_backend
    .
    k
    3 replies · 2 participants
  • j

    Jacob Blanco

    09/16/2021, 12:35 AM
    Also I had a look at the repos and couldn’t find this feature request, but we rely heavily on parameterized flow runs and it would be great to be able to filter runs in the Cloud dashboards by parameters used.
    k
    1 reply · 2 participants
  • m

    Martim Lobao

    09/16/2021, 12:54 AM
    hey, so it looks like a dependent flow run has just disappeared… it’s not archived, just gone. 😕 it was a dependent flow from a flow of flows. the link to the flow run doesn’t load anything anymore, it just brings up a blank page. it’s also not in the list of runs for the flow, nor in the dashboard with the history of every flow run.
    k
    z
    13 replies · 3 participants
  • c

    CA Lee

    09/16/2021, 3:30 AM
    Hello, is there any way to stop flows from running on certain dates?
    n
    9 replies · 2 participants
  • z

    Zac Chien

    09/16/2021, 7:56 AM
    Hi, I use python threading package in a Prefect task, the target functions works properly, but the context logger inside the target function doesn’t show in UI Logs, is there any way to catch the log in Prefect?
    z
    j
    17 replies · 3 participants
Powered by Linen
Title
z

Zac Chien

09/16/2021, 7:56 AM
Hi, I use python threading package in a Prefect task, the target functions works properly, but the context logger inside the target function doesn’t show in UI Logs, is there any way to catch the log in Prefect?
z

Zach Angell

09/16/2021, 1:25 PM
Hey @Zac Chien - logging from within threads / subprocesses can be tricky. Could you share a bit more about how you're using the threading package?
z

Zac Chien

09/17/2021, 8:58 AM
@Zach Angell Thanks for you message! Today I am busy in weekly meetings, I will reply to you later! Have a good day! 😀
👍 1
Sorry for the late reply! We have a Prefect task named
wit-labeling
which accepts a
list of label-name
as input, also we have a python function consists of three steps(call boto3 S3, Athena function) for a single labeling logic. In
wit-labeling
, threading is used for speedup purpose, meanwhile we want to know the details for each of labeling step, including for debug purpose when there is any following job doesn’t work properly. Now we are in development state and we found that we rely heavily on the log. 🥺
z

Zach Angell

09/20/2021, 5:15 PM
Gotcha any chance you can share a minimal version of the threading code in
wit-labeling
?
j

Jessica Smith

09/21/2021, 4:05 PM
FYI - I am running into this same kind of issue. I'm importing a pool from multiprocessing.dummy and using pool.map to execute multiple threads. None of the logs make it to the UI.
z

Zach Angell

09/21/2021, 4:23 PM
Sorry to hear that @Jessica Smith, would you share a minimal example of how you're using multiprocessing pool and logging?
j

Jessica Smith

09/21/2021, 4:25 PM
This is how we're running the code in parallel
with ThreadPool(20) as pool:
        
results = pool.map(run_extract_process,records)
then in run_extract_process I'm just trying to capture the input variable, so I know which one is which.
<http://logger.info|logger.info>(f"Starting extract for {v}")
and again, ThreadPool is from multiprocessing.dummy
from multiprocessing.dummy import Pool as ThreadPool
the missing logs don't seem to make it past this line in logging.py
# if its not during a backend flow run, don't emit
        
if not context.get("running_with_backend"):
            
return
Sure enough, if I change my code to add a line changing the context, it shows up in the UI.
with prefect.context({"running_with_backend": True}):
<http://logger.info|logger.info>(f"Starting extract for {v}")
z

Zach Angell

09/21/2021, 8:35 PM
Ahhhh gotcha that makes a lot of sense.
prefect.context
is threadsafe, so the context is lost when running in a thread pool
I suspect you could pass
prefect.context
to
run_extract_process
as an arg and then re-initialize in
run_extract_proces
def run_extract_process(record, prefect_context):
    with prefect.context(prefect_context):
        # ... everything else
j

Jessica Smith

09/22/2021, 2:21 PM
I'll try that. To confirm, is this something that is working the way it is designed, and no changes will be made to allow multithreading without resorting to passing around the context?
z

Zach Angell

09/22/2021, 2:36 PM
Correct. If you use custom threading logic within a task, we have no way of passing the necessary context to your new threads. That will have to be done with custom code. When using Prefect executors to execute tasks using multithreading, Prefect will take care of this for you.
👍 1
j

Jessica Smith

09/22/2021, 2:37 PM
Perfect, that is exactly what I needed to know. Thanks!
👍 1
View count: 2