https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Sean Harkins

    04/17/2021, 1:43 AM
    I am investigating how we can aggregate module level logs from our Dask workers in Prefect Cloud. I had discussed this a bit with @Jim Crist-Harif in this thread https://prefect-community.slack.com/archives/CL09KU1K7/p1617040250140900?thread_ts=1616957545.069800&cid=CL09KU1K7 where he mentioned that configuring this was somewhat complex. The extra loggers documentation section https://docs.prefect.io/core/concepts/logging.html#extra-loggers demonstrates how to configure module level loggers to use Prefect’s
    CloudHandler
    via environment settings. I used the appropriate environment settings in my
    run_config
    with
    flow.run_config = ECSRun(
                image=worker_image,
                labels=["dask_test"],
                task_definition=definition,
                env={"PREFECT__LOGGING__EXTRA_LOGGERS": "pangeo_forge.recipe"}
            )
    I have also configured the task’s regular Python logging with
    logging.getLogger("pangeo_forge.recipe").setLevel(level=logging.DEBUG)
    and this logger is successfully writing to my Cloudwatch logs for the Dask worker. But none of this module’s log entries are being written to Prefect Cloud. Any suggestions on how I should configure things so that my Dask workers’ log streams are also written to Cloud?
    k
    • 2
    • 12
  • j

    Julio Venegas

    04/17/2021, 4:09 PM
    Hey! How can I properly report a bug? I used the new
    prefect register --project PROJECT_NAME --path PATH/TO/DIR_WITH_N_FLOWS
    and even though all flows in the dir are registered and showing in the project, when I try to run them, the agent does not pick them up. If I register them individually with
    cd PATH/TO/DIR_WITH_N_FLOWS
    python flow1.py
    python flow2.py
    and I try to run them, they work i.e. the agent picks them up. Where do I get the appropriate logs and where do I share what I find?
    k
    • 2
    • 1
  • h

    Hawkar Mahmod

    04/17/2021, 7:08 PM
    If I wish to inject values into my flow at run time from the environment (env vars) where should this occur? Should it be at the start of my flow using a Context context manager? These are values such as the application env (prod, dev, staging) and particular business logic values that are not secrets. I tried to create a user defined config object and then have my tasks read from them but then realised that this object was being created at flow build time and so the values were being frozen to what I had either locally when I registered my flow or in my build environment.
    m
    k
    • 3
    • 4
  • a

    Aurélien Vallée

    04/18/2021, 6:00 AM
    Is there a way to dynamically change the properties of the flow executor? I have a specific situation where I run say 30 tasks in parallel (using mapping) to retrieve data, but the data provider can reply that the API is overused in which case I need to reduce the amount of workers in the dask executor to 1, otherwise the provider will ban the IP. I could handle that somehow internally to the download task with threading.Lock, but I guess that would only work for the threading local dask executor. Can it be handled at the prefect flow or task level?
    k
    • 2
    • 4
  • a

    Avi Haiat

    04/18/2021, 4:44 PM
    Hi community, i am new to prefect and trying to wrap my head around the framework. I have a simple ETL flow that i need to implement reading a large collection from mongodb, transforming it, and saving back the result to a mysql table. The collection is too big to fit in memory so i need to be able to do some sort of batching but i am struggling to understand how. i was thinking of fixing a LIMIT constant, calculating the count of the collection and producing map tasks over a
    range(0,count,LIMIT)
    pseudo code:
    @task 
    def count_collection_mongo(collectionName):
        # calculate number of rows in the collection and return it
        return count
    
    @task
    def load_data(collectionName, skip, limit):
        # data = db.collectionName.find().skip(skip).limit(limit)
        return data
    
    @task
    def transform(data):
        print(data.get('name'))
        return data
    
    @task
    def calculate_iterations_data(count_result):
        res = range(0, count_result, LIMIT)
        return res
    
    
    with Flow("ETL flow for mycollection") as flow:
        result_count = count_collection_mongo("mycollection")
        iterations = calculate_iterations_data(result_count)
        data = load_data.map(iterations)
        transform.map(flatten(data))
    
    flow.run()
    k
    • 2
    • 5
  • a

    Avi Haiat

    04/18/2021, 4:45 PM
    is this the correct way to tackle the issue? How i can make sure that memory will not blow up as each of the load_data will save its result in memory, how can i control the number of parallel transform that can run ?
  • v

    Varun Joshi

    04/19/2021, 3:56 AM
    My flow worked fine until I add a GCP Data Catalog and Big Query piece in my code which is helping me fetch some column names from the Data Catalog and update some data in Big Query. Once I added the Data Catalog Piece and Big Query client.
    from google.cloud import datacatalog_v1
    from google.cloud import bigquery
    
    client = bigquery.Client()
    datacatalog_client = datacatalog_v1.DataCatalogClient()
    I get an error saying that: Clients have non-trivial state that is local and unpickleable Can someone tell me what does this mean? I'm assuming this has something to do with GCP. I'm using a local service account json file and also importing pubsub modules, I didn't have any issue then.
    k
    • 2
    • 11
  • r

    Rob Fowler

    04/19/2021, 11:46 AM
    I have my flows all converted from prefect core to running in prefect server and coming back to getting some parallelism I thought that this:
    from time import sleep
    
    import prefect
    from prefect import Flow, task
    from prefect.executors import LocalDaskExecutor, DaskExecutor
    from prefect.storage import Docker
    
    
    @task
    def slow_task(item, sleep_time=4):
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>(f"==== IN TASK item {item} sleeping {sleep_time}")
        sleep(sleep_time)
        <http://logger.info|logger.info>(f"## Awake {item}")
        return item
    
    
    @prefect.task
    def produce_range(rangelen):
        return range(rangelen)
    
    
    with Flow("parallel") as flow:
        nrange = produce_range(6)
        numbers = slow_task.map(item=nrange)
    
    flow.executor = LocalDaskExecutor(workers=10, scheduler="processes")
    #flow.executor = DaskExecutor()
    
    if 'local_script_path' in prefect.context:
        flow.storage = Docker(registry_url='<http://old7.mianos.com:5000|old7.mianos.com:5000>')
    
    
    if __name__ == '__main__':
        flow.run()
    Would start 10 processes inside the docker container running, returning after a few seconds but it executes each task serially unless I run it without any arguments to LocalDaskExecutor. Under k8s DaskExecutor seems to start a few threads but nowhere near enough to what I want (30+). I feel I am missing something.
    n
    • 2
    • 9
  • m

    Milly gupta

    04/19/2021, 1:24 PM
    Hi All, is anyone using Module storage option?
    k
    f
    m
    • 4
    • 14
  • j

    Jack Sundberg

    04/19/2021, 3:37 PM
    Hey everyone, is there a good way to access flow-run metadata within a flow context? For example, a task that grabs the current flow-run-id or even an upstream task's duration. Or will pulling metadata need to be done in a separate flow?
    k
    • 2
    • 5
  • j

    Jonathan Chu

    04/20/2021, 12:13 AM
    hi guys, we had a docker agent fail over the weekend, so there were like 100 late scheduled runs as soon as i restarted the agent, it immediately queued and started all 100 runs these are pretty expensive tasks, so firing them all up at once immediately ground the server to a halt, couldn't ssh in to try killing things so my questions coming out of this are: • How do you configure the scheduled services to just give up if they get missed? We run the task every hour, so it's fine to just run it the next hour instead. Having it give up after, say, 30 minutes of waiting, would be very elegant, but just failing immediately is perfectly fine • How do you mass cancel scheduled jobs that have missed their time? • How do you mass cancel running jobs? I manually clicked open every job and pressed cancel until the system starting responding enough to ssh and I could go in and start killing things via the docker cli. With how unresponsive the UI is in general, this was pretty painful. • Is there some way to kill the agent to stop everything? This is the idea of some kind of breaker switch -- prefect just started going crazy on this server, shut it all down so we're back to a working state, and then we can sort it out afterwards. • After all the jobs had been marked as Cancelled or Failed, there were still containers running on the server. Are these just all the Failed containers that lost their heartbeat? Is there a nicer way to clean them up, or are Failed containers just abandoned by Prefect at that point, and the answer is to just go in and manually remove them like I did? Does that mean that any Failed tasks should always be manually reviewed for any hanging containers that need to be stopped and removed? • How do you configure a limit for the agent, so it can't get bad enough to make the server inaccessible to repair? I know based on tags that this agent is only going to run jobs of a certain type, so I could set the limit to, say, 5, and it'd just have 5 running 95 queued, and then I'd just need to mass-cancel those queued jobs.
    👀 1
    n
    x
    • 3
    • 17
  • x

    xyzz

    04/20/2021, 8:45 AM
    Does the cost calculator at https://www.prefect.io/pricing/ work for anyone? I can't enter numbers higher than 9999
    k
    n
    • 3
    • 9
  • x

    xyzz

    04/20/2021, 8:57 AM
    Also it would be great if the blue keywords would show detailed description of the feature on mouse over or link to a page with it
  • r

    Rob Fowler

    04/20/2021, 9:01 AM
    I just ran one of my new flows in production, scripts run on 3500 machines
    👍 7
  • d

    Dotan Asselmann

    04/20/2021, 1:37 PM
    hey everyone, is it possible to capture default ‘logging’ logs on prefect run? we’re using many libraries that print valuable logs but don’t have the context of the prefect logger. we want instead of pushing forward the logger object to be able to use the global python logging object
    k
    • 2
    • 1
  • p

    Pedro Machado

    04/20/2021, 2:45 PM
    Hi everyone. I am working with a data science team that wants to orchestrate some workflows. They already have scripts that run inside of a docker container. I was thinking that the easiest way to add these to the flow would be to use the Docker Tasks (
    CreateContainer
    ,
    StartContainer
    , etc.). However, it seems that the logs would need to be retrieved after the container finishes running with
    GetContainerLogs
    . Is this correct? If so, this is less than ideal for our use case given that these are long-running processes we'd want to see the logs in real-time. So far, I've thought about a couple of alternatives: 1. Modify the Docker Tasks to use the
    logs
    method with
    stream=True
    (I haven't tested this yet, but the docs suggest this could work) 2. Add prefect to their docker image and create a flow that runs inside of the image Do you see another option? What would you recommend? Thanks!
    👀 1
    n
    • 2
    • 3
  • k

    Kelby

    04/20/2021, 3:22 PM
    Is it possible to change the start time of a scheduled workflow?
    n
    • 2
    • 4
  • k

    Kayvan Shah

    04/20/2021, 6:29 PM
    Is there any way to define which task executes first and second task has to be executed only after first task is executed Both of these tasks loads same data from a loader function
    👀 1
    n
    k
    • 3
    • 8
  • m

    Mitchell Bregman

    04/20/2021, 7:48 PM
    Hello, I am using KubernetesRun for the flow run configs.. I am trying to specify a
    service_account_name
    , but it seems the pods are still using the default service account to the namespace.. my agent is also deployed with an undefined
    service_account_name
    .. any thoughts?
    flow.run_configs = KubernetesRun(
        image_pull_secrets=["xxx"],
        job_template=config.KUBE_JOB_TEMPLATE,
        cpu_limit=config.KUBE_CPU_LIMIT,
        cpu_request=config.KUBE_CPU_REQUEST,
        memory_limit=config.KUBE_MEMORY_LIMIT,
        memory_request=config.KUBE_MEMORY_REQUEST,
        service_account_name="my-service-account",
    )
    k
    t
    • 3
    • 8
  • g

    Gage Toschlog

    04/20/2021, 9:14 PM
    Hello. I’m attempting to use the DBTShellTask to execute DBT tasks and stream the DBTShellTask output to the Prefect flow logs. This is working locally when I start a local agent and register and run the flow, but the same code is not streaming the output when executing inside a docker container. Any suggestions?
    @task
    def run_DBT_task():
    
        task = DbtShellTask(
            profile_name='snowflake',
            environment='sandbox',
            profiles_dir='.',
            stream_output=True,
            return_all=True
        )
        
        task.run(command="cd ~/src/prefect && dbt run")
    
    with Flow(name="Local Test") as flow:
        run_DBT_task()
    
    flowID = flow.register(project_name="51", labels=["localtest"])
    
    client.create_flow_run(flow_id = flowID)
    👀 1
    n
    s
    • 3
    • 25
  • a

    Aurélien Vallée

    04/21/2021, 10:16 AM
    There is something that is a bit unclear to me regarding the prefect loggers. What problem does it solve? What is the difference between:
    @task
    def mytask():
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>('hello')
    and
    logger = logging.getLogger(__name__)
    
    @task
    def mytask():
        <http://logger.info|logger.info>('hello')
    i.e. What is the point of using a prefect-provided logger instead of regular python logging loggers?
  • a

    Aurélien Vallée

    04/21/2021, 10:17 AM
    is it just a way to have per-task loggers? is there no difference? Basically i'm running into issues of duplicated log messages with prefect loggers, which is kind of annoying. I'm wondering if the whole concept of using these loggers instead of standard python loggers is actually useful
  • a

    Aurélien Vallée

    04/21/2021, 10:18 AM
    and the doc is quite unclear as to why these prefect loggers exist, it just mentions that they exist so I assumed it's better to use them, but I have no idea what they actually do
    n
    • 2
    • 2
  • r

    Rob Fowler

    04/21/2021, 10:32 AM
    Would there be a recommended way for a client to block waiting for a flow to complete? I am sending the task request with a combo of graphql and the client functions (like prefect flow run) but I would like the endpoint to block for a while for shorter tasks. Just looking for ideas. (I have puslar integrated so I might simply subscribe and have a task in the end of the flow publish, unblocking the client).
    k
    • 2
    • 4
  • c

    ciaran

    04/21/2021, 11:55 AM
    Hey folks! Does anyone have an example repo they could share that's handling the deployment of a Prefect Agent with AKS? Currently starting off with spinning up a cluster with Terraform, but my k8s skills are sub-par so some useful starters would be handy. For reference i've CDK'd up a Prefect Agent & respective cluster in ECS on AWS before, but that wasn't involving k8s.
    t
    k
    • 3
    • 248
  • j

    Jérémy Trudel

    04/21/2021, 1:34 PM
    Hi folks! I'm wondering if there's a way to code a Task that would be mapped, but initialized only once. I want to map a rollbar task onto a result (list) from a previous task, but ideally I would like to call the rollbar.init() only once. Anyone know how I could do that?
    k
    • 2
    • 4
  • m

    Marc Lipoff

    04/21/2021, 2:49 PM
    I'm running into an issue where I cannot build my flow (using docker storage). I am running:
    prefect build -p /home/circleci/project/data_flows/flows/vaccine_cdc.py
    (log in thread)
    m
    k
    • 3
    • 33
  • k

    Kayvan Shah

    04/21/2021, 4:35 PM
    sample.py
    n
    • 2
    • 5
  • s

    Sveta Kochkaeva

    04/21/2021, 4:52 PM
    Hi guys! Happy to join your community! Looking for ways to put my 'Data mess' in order
    👋 4
    n
    k
    • 3
    • 2
  • m

    Mark McDonald

    04/21/2021, 5:18 PM
    I have a single flow where I want to limit concurrency to one flow run at a time for business logic reasons. I see that in Prefect Cloud I can limit concurrency at a Flow level using labels. Currently we use labels to tag flows by team (business-intelligence, data-engineering, machine learning, etc) and by environment (uat, prod) for routing to the appropriate agent. Is there any other way of limiting flow concurrency based on something other labels (e.g. flow name and project)? I can make it work, but I'm just not crazy about the idea of adding another label that would specify flow name.
    n
    • 2
    • 13
Powered by Linen
Title
m

Mark McDonald

04/21/2021, 5:18 PM
I have a single flow where I want to limit concurrency to one flow run at a time for business logic reasons. I see that in Prefect Cloud I can limit concurrency at a Flow level using labels. Currently we use labels to tag flows by team (business-intelligence, data-engineering, machine learning, etc) and by environment (uat, prod) for routing to the appropriate agent. Is there any other way of limiting flow concurrency based on something other labels (e.g. flow name and project)? I can make it work, but I'm just not crazy about the idea of adding another label that would specify flow name.
n

nicholas

04/21/2021, 5:27 PM
Hi @Mark McDonald - at the flow level that's the only way we handle concurrency at the moment; another option you do have is if there are specific tasks in the flow that are the real reason for the concurrency limit, you can add tags to those tasks and not interfere with your flow-level labels
m

Mark McDonald

04/21/2021, 5:27 PM
got it. Thanks for the confirmation, @nicholas
n

nicholas

04/21/2021, 5:28 PM
I think you make a good argument that your expectations for labels sort of conflict with how they're used in the concurrency model though; if you want to open a discussion ticket and describe how you might like this to work, I think we'd be open to exploring some different methods here
m

Mark McDonald

04/21/2021, 5:29 PM
will do
👍 1
@nicholas https://github.com/PrefectHQ/prefect/discussions/4429
n

nicholas

04/21/2021, 5:50 PM
That's great @Mark McDonald! Thank you
m

Mark McDonald

04/21/2021, 9:38 PM
@nicholas - I'm trying to implement my own currency through a couple of calls to the graphql api. basically at the start of my flow I'm just going to query the graphql api, see if there are any other active runs and stop the current run if there are. I would do this by making these api calls: 1. Fetch the most recent flow runs, not including the current flow run (I need to figure out how to filter the current flow run out)
{
  flow_run(where: {flow_id: {_eq: flow_id}}, limit: 1, order_by: {created: asc}) {
    id
  }
}
2. I'd then check the flow_run_state for the most recent flow run and make sure that status is success, failed, or cancelled
{
  flow_run_state(where: {flow_run_id: {_eq: flow_run_id}} limit: 1, order_by: {timestamp: desc}) {
    state, timestamp
  }
}
Does this make sense to you? Is there a better way of querying the graphql api to see if there are any active runs for flow, regardless of flow version?
n

nicholas

04/21/2021, 9:59 PM
Hi @Mark McDonald - try something like this:
query {
  flow_run(
    where: 
     { 
       flow_id: { _eq: <<flow_id from context>> }, flow_run_id: { _neq: <<flow_run_id from context>> },
state: { _in: ["Running", "Submitted"] } } ) 
  { 
    id
    state
  }
}
Sorry if the formatting is bad there, on mobile currently
That'll get you back any runs for the given flow that are in running or submitted states except for the current flow run; the presence of any of those should tell you not to proceed
m

Mark McDonald

04/21/2021, 10:01 PM
awesome, this makes sense - thanks. I'll try it now
this works well, thanks again @nicholas
n

nicholas

04/22/2021, 12:52 PM
Great! 😁
View count: 1