https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Marwan Sarieddine

    02/06/2022, 3:43 PM
    Hi folks, a question about prefect cloud automations. What does the “does not start” condition really mean? Does it wait to check until the flow run is in a “Running” state; or does a “Submitted” state equate to a “start”?
    a
    k
    • 3
    • 5
  • s

    Sergi de Pablos

    02/07/2022, 7:07 AM
    Hi people, a question about environments. How do you support different environments (development / testing / production) in your code, not only env variables, but also taking into account things like different environments using different executors?
    a
    • 2
    • 3
  • v

    vinoth paari

    02/07/2022, 8:02 AM
    Hi I have installed the 2.0a9 . But unable to exeute prefect cmd like prefect auth login --key **********8 etc .I am getting no such cmd auth . Error: Prefect Server does not have authentication. Change your backend to Prefect Cloud with
    prefect backend cloud
    to log in.
    a
    • 2
    • 10
  • r

    rishika kumari

    02/07/2022, 8:06 AM
    Hello Myself Rishika, Product and Business Developer at COMETLABS. We are organizing our flagship hackathon for which we would like to collaborate with Perfect for partnership/sponsorship. Could someone connect me to the relevant person. Thanks and Regards Rishika Product and Growth COMETLABS
    👋 1
    a
    v
    c
    • 4
    • 3
  • c

    Carmen Marcos

    02/07/2022, 10:16 AM
    Hello everyone! During the past few weeks I have been learning how to use Prefect locally. However, now I am trying to deploy a flow using 4 OpenStack machines and I have some questions about the possible implementation: • Is it possible to make a flow of flows in which each flow executes on a specific machine of the 4 I have (without using dask)? Machines would share volumes between them. • In one of the machines I want to run a flow (triggered by other flow on a different machine) that must run a docker with a custom image when certain criteria is meet. Could I use the Docker Agent with the Local Executor for that flow? Thank you in advance! 🙏
    a
    • 2
    • 4
  • c

    Carlos Paiva

    02/07/2022, 10:52 AM
    Hello! Is there a way to have concurrency limit on the agent? (Prefect Server)
    a
    k
    • 3
    • 5
  • r

    Ricardo Gaspar

    02/07/2022, 2:04 PM
    Hey there, maybe you can help me out here with some Prefect code and the rendered schematics. Context: I’m developing a flow that creates and submits simple spark apps to EMR. To my surprise there are no built-in tasks for EMR (docs). Nonetheless, I am trying to use
    boto3
    and
    awswrangler
    APIs (as I’ve seen others users asking for it on the community channel). My current issue, is on the lack of understanding on how the Flows are rendered on the Schematic view. It’s related to tasks dependencies and inter-task communication (passing values).
    a
    k
    • 3
    • 24
  • b

    Bruno Murino

    02/07/2022, 2:14 PM
    Hi everyone — I'm have an ECS Agent kicking off some ECS tasks one the underlying EC2 instances (we are not using Fargate), and sometimes the agent tries to start the ecs task on an ec2 that doesn't have enough memory available, so the flow fails to start. Is there any way to tell the ECS agent to try again or something?
    k
    a
    • 3
    • 9
  • f

    Fabrice Toussaint

    02/07/2022, 2:33 PM
    Hi all, I got a question regarding the `apply_map`function of Prefect. Is it possible to run this from within a task as well, as I am getting the following error:
    ValueError("Couldn't infer a flow in the current context")
    k
    • 2
    • 3
  • r

    Ryan Brennan

    02/07/2022, 2:39 PM
    Hi everyone - I’m curious to hear how teams are organizing their
    Projects
    . Right now we just have one big project for all of our flows. Is there any benefit to breaking them out further? Is it possible to do things like “run all flows in project X” or are projects only for organizational aesthetics?
    k
    a
    • 3
    • 3
  • k

    Ken Nguyen

    02/07/2022, 5:17 PM
    Hi everyone, This may seem a bit strange but I’m planning to use a Google Sheet as a front end where users can type in inputs. Then, users can click the submit button to trigger a Prefect flow that will read the Gsheet’s inputs, transform it, then write it back onto the Gsheet. I’m wondering if anyone has tried something similar and can offer feedback on my approach: 1. We have a button in Gsheets that will activate a Google Apps Script to do an API call 2. The API call (hosted on AWS API Gateway) will trigger a Lambda function, which will trigger a Prefect flow 3. Prefect flow will read data in Gsheets, transform the data, then write data in Gsheets Any suggestions or input is appreciated!
    a
    • 2
    • 6
  • h

    Heeje Cho

    02/07/2022, 5:20 PM
    Does task decorator currently support python generators? Trying to see if tasks play nice with yield.
    @task
    def generator(limit: int):
        logger = prefect.context.get("logger")
        for i in range(limit):
            <http://logger.info|logger.info>(i)
            yield i*i
    k
    • 2
    • 5
  • n

    Nick Hart

    02/07/2022, 6:17 PM
    Hi, I’m trying to create a conditional Flow of Flows that runs a flow and then based on that flow’s final state either runs
    flow_if_success
    or
    flow_if_failure
    . Basically, I want to know how I can take the final state signal from
    conditional-flow
    and decide whether I want to run
    flow_if_success
    or
    flow_if_failure
    . I was having trouble matching the task signal with a value so that the case works properly. I was hoping you would be able to help me out. Would I need to use the
    get_task_run_result
    or is there a better way I can just grab the task result? Here is a snippet of my current test code:
    var1 = Parameter("var1", default = 4)
    var2 = Parameter("var2", default = 12)
    
    conditional_id = StartFlowRun(flow_name="Conditional-Flow", project_name="Test", wait=True)
    flow_if_success_id = StartFlowRun(flow_name="Flow_if_success", project_name="Test", wait=True)
    flow_if_failure_id = StartFlowRun(flow_name="Flow_if_failure", project_name="Test", wait=True)
    
    
    # Idea: Conditional flow runs first. If it succeeds with Success signal, run flow_if_success, if it fails with failure signal, run flow_if_failure
    with Flow("Conditional FoF") as parent_flow:
        conditional_run = conditional_id()
    
        with case (conditional_run, Success): #Conditional_run never matches Success even if successful task run
            flow_if_success_run = flow_if_success_id(parameters=dict(num=var1))
        with case (conditional_run, Failed): 
            flow_if_failure_run = flow_if_failure_id(parameters=dict(number=var2))
    a
    • 2
    • 13
  • a

    Andrea Haessly

    02/07/2022, 6:23 PM
    I'm still trying to understand when I can set task attributes during the task constructor (vs when a copy of the task is made and the values are not carried over). Will post code example in 🧵
    k
    • 2
    • 10
  • b

    Brian Phillips

    02/07/2022, 7:55 PM
    Is anyone aware of any CI vendors that follow a similar execution model to Prefect? i.e. one where the source code is never on servers owned by the vendor?
    t
    • 2
    • 2
  • t

    Tim Enders

    02/07/2022, 8:40 PM
    Getting this error on Prefect Cloud, but not locally.
    Clients have non-trivial state that is local and unpickleable.
    The load still seems to run OK, but it is marked as failure. How can I get rid of this error?
    k
    • 2
    • 2
  • d

    David Yang

    02/07/2022, 9:20 PM
    Hi All, How to run local or docker agent processes backend on a windows system? similar to Supervisor.
    k
    a
    • 3
    • 8
  • v

    Vamsi Reddy

    02/08/2022, 12:08 AM
    Hi all, is it possible to know the status of a flow with a particular run_name ? I want to query flow runs using python….we want to check if a flow is currently running else we will be creating a run for it.
    k
    • 2
    • 16
  • d

    Daniel Saxton

    02/08/2022, 1:19 AM
    What's the easiest way to cancel a flow based on a condition (like say you have a pipeline that checks for a file and only continues if it's found)?
    k
    • 2
    • 1
  • o

    Ovo Ojameruaye

    02/08/2022, 7:58 AM
    I have been running prefect server for the last month and my containers and services have stayed up and healthy. I am not sure what changed in the last few hours but my agents can no longer connect to the server and the UI no longer renders. I am not sure how to debug this. From what I can see, the services are still up. For the agents, Timeout exception, the UI tries to load but doesn't render. I can shut down and restart all containers but I would like to keep the metadata stored in the database. Can anyone help?
    a
    k
    • 3
    • 49
  • a

    Amichai Ben Ami

    02/08/2022, 12:16 PM
    Hi, I am trying to run prefect on azure AKS. we are using Helm, but not using Postgres subchart, but external postgres. it used to work until few days ago. when installing it we get in graghql
    db-upgrade] Error applying Hasura metadata from /prefect-server/services/hasura/migrations/metadata.yaml
    I could not find anything that changed from our side. any idea? Thanks 🙏
    k
    • 2
    • 2
  • a

    Antonio Manuel BR

    02/08/2022, 12:58 PM
    Hello, I am executing one flow in a remote distributed dask cluster. Previously, I executed my code locally and gathered the flow results in the local easy way (e.g.
    subflow_res.result[deployed_model].result
    ). When I wrote the code, I read the docs, knowing this way was not valid for remote executions. I would like to know the proper way to gather results when working in a remote distributed environment. Can anyone help me?
    k
    • 2
    • 6
  • b

    Brett Naul

    02/08/2022, 1:19 PM
    is there (going to be) any way to run prefect 1.0 and 2.0 in the same installation for backwards compatibility? it'd be nice to have a way to experiment with one or two flows without mixing environments/branches
    k
    • 2
    • 5
  • t

    Thomas Pedersen

    02/08/2022, 2:37 PM
    Docker image prefecthq/prefect:0.15.13-python3.9 comes with google-cloud-aiplatform==1.9.0 and google-cloud-storage=2.1.0 preinstalled. But google-cloud-aiplatform==1.9.0 requires google-cloud-storage >=1.32.0,<2.0.0dev ...
    k
    m
    • 3
    • 3
  • b

    Bruno Murino

    02/08/2022, 2:54 PM
    Hi everyone — I need to send some Prefect logs to Elasticsearch, is there a way to modify the prefect logger to do that in addition to what it already does?
    k
    • 2
    • 2
  • a

    Archie Kennedy-Dyson

    02/08/2022, 3:09 PM
    Hey everyone, im looking to add a timeout to a task that has a max_retry on it. Does anyone know if the timeout is per retry or for all the retries combined?
    👍 1
    k
    • 2
    • 2
  • a

    Andrew Lawlor

    02/08/2022, 3:24 PM
    does anyone know how to optimize the memory and cpu request values for a KubernetesRun? I ran a flow and would like to see how much of each is required for it, so I can request the correct values next time
    k
    • 2
    • 1
  • m

    Matthew Seligson

    02/08/2022, 5:42 PM
    Are flow and task UUIDs unique even across tenants and projects? How are they seeded and could we expect collisions of task ID’s between tenants or projects?
    k
    • 2
    • 3
  • m

    Madison Schott

    02/08/2022, 8:19 PM
    Hi all, I'm looking to run a command within my Docker container in production in order to generate a slack notification for a dbt open-source package. Is there a specific type of task I would need to write for this within my Python file? The command is as follows:
    re_data notify slack \
    --start-date 2021-01-01 \
    --end-date 2021-01-31 \
    --webhook-url <https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX> \
    --subtitle="[Optional] Markdown text to be added as a subtitle in the slack message generated"
    k
    • 2
    • 27
  • t

    Tom Shaffner

    02/08/2022, 8:42 PM
    I have a case in which a LocalDaskExecutor task pulling data from a db gets the data and completes, but then never actually returns. It seems to stop without returning. I had a problem like this a release or three back in Prefect that resulted in a bug fix, but I haven't seen such an instance since. Is the issue back to our knowledge?
    k
    m
    • 3
    • 116
Powered by Linen
Title
t

Tom Shaffner

02/08/2022, 8:42 PM
I have a case in which a LocalDaskExecutor task pulling data from a db gets the data and completes, but then never actually returns. It seems to stop without returning. I had a problem like this a release or three back in Prefect that resulted in a bug fix, but I haven't seen such an instance since. Is the issue back to our knowledge?
Related to the above, I turned on debug and added some debug statements around the end of the task. Here's the start of the logs as the flow began, and the second log is the point at which the task should have returned (but didn't).
logs (1).txtlogs.txt
This is the task in question:
@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
    if cache_flag:
        <http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(server=server) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)
    data_amount=len(df.index)
    <http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
    assert data_amount!=0 # Cause task failure if 0 is returned    
    logger.debug(f"Oracle pull completed; returning...")
    return df
And here's the flow:
with Flow(FLOW_NAME,result=LocalResult(),schedule=schedule) as flow:
    cache_invalid = Parameter('cache_invalid',default=False)
    <http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
    df = pull_oracle_data_via(oracle_query_sql=ORACLE_QUERY_SQL_PATH,server=ORACLE_SERVER,cache_flag = cache_invalid)
    logger.debug('Oracle pull returned.')
    df = set_data_types(df)
    create_data_summary_artifacts(df)
    u=upload_to_table(df, destination_table = DATA_DESTINATION_TABLE_NAME,append=False)    
    add_indices_to_table(upstream_tasks=[u],destination_table=DATA_DESTINATION_TABLE_NAME,column_list=INDEX_COLUMNS)

flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
                                'PREFECT__LOGGING__LEVEL':'DEBUG',
                            },
        working_dir=file_path, labels=["normal-process"])

flow.executor = LocalDaskExecutor(scheduler='threads',num_workers=8)

flow.register(project_name=PROJECT_NAME,parameters={'cache_invalid':False},idempotency_key=(FLOW_NAME+datetime.now().strftime("%Y%m%d-%H%M%S")))
k

Kevin Kho

02/08/2022, 8:45 PM
It looks like Michael’s PR was added to version 1.0rc1. Not sure it’s related but are you on the 1.0rc1?
t

Tom Shaffner

02/08/2022, 8:45 PM
1.0rc?
I'm on prefect 0.15.13
Is that wrong? If I install via
pip install -U prefect
the above is what I get as the most current one
That PR does indeed look like the one that fixed this issue last time (or at least one with similar behavior) but I thought that was folded in here since 0.15.11 or so; I haven't seen the issue recently until I added this new flow today and it suddenly appeared again.
k

Kevin Kho

02/08/2022, 8:48 PM
Yep the release candidate for 1.0. It’s available as a pre-release so you can get it with
pip install prefect==1.0rc1
. The
-U
just gets the latest release (not pre-release)
t

Tom Shaffner

02/08/2022, 8:48 PM
Doesn't the 0.15.13 have the above fix already though?
Should I prefer the rc one over the current one? Given the name I assumed rc would be less stable; not so?
k

Kevin Kho

02/08/2022, 8:51 PM
You are right it should already be in 0.15.13 it seems
One sec looking at tracebacks
t

Tom Shaffner

02/08/2022, 8:52 PM
There's some log missing in the middle too; the above logs were the start and finish of the logs, but the missing piece wasn't relevant. (I don't know a way from the gui to download the full log; it seems to only download the chunk that's visible).
k

Kevin Kho

02/08/2022, 8:58 PM
Are you trying to avoid processes for the LocalDaskExecutor scheduler? You opening to trying that? Does this issue happen consistently?
t

Tom Shaffner

02/08/2022, 8:59 PM
Not sure I understand the first question; you mean processes vs. threads? In this particular case I'm not averse to changing that; should I try it? And no, this is the first time I've encountered the issue again since the above-referenced fix.
And I have tried to see what's different about this flow vs. others that are very similar and I don't see any difference that might cause this
k

Kevin Kho

02/08/2022, 8:59 PM
Yes exactly I think processes might be more stable. Worth a shot.
m

Michael Adkins

02/08/2022, 9:00 PM
That PR has been released since 0.15.11
t

Tom Shaffner

02/08/2022, 9:01 PM
Restarting with processes; it'll take 20 minutes or so to see if it made a difference. And @Michael Adkins, agreed, and I haven't seen this issue since then until now.
It's a bit concerning to me that it occurred here though; most of my other flows are currently set to use threads; is it possible they'll start having this issue again?
I haven't had issues with it recently, but makes me nervous when I can't explain why I did here and not in the others
m

Michael Adkins

02/08/2022, 9:02 PM
It’s hard to say because I don’t understand what happened here yet either.
We’ll need the DEBUG level logs from the timeout function to diagnose where the deadlock occurs.
t

Tom Shaffner

02/08/2022, 9:04 PM
How do I get those again? I attached debug logs at the top; I remember last time you gave me some other command though to get more detailed debug logs; what was the process for that?
m

Michael Adkins

02/08/2022, 9:05 PM
It looks like the DEBUG logs in the top just don’t include the relevant section
t

Tom Shaffner

02/08/2022, 9:06 PM
Hmm, well as you can see though, Debug was clearly on as debug results came back. What do I need to do to turn on any other sections if that didn't do it?
In this case I turned on debug in the flow (visible in the....third post here I think it was?)
Correction; fourth post.
m

Michael Adkins

02/08/2022, 9:09 PM
It seems on but I do not see the full logs for the flow run?
t

Tom Shaffner

02/08/2022, 9:09 PM
What do I need to turn on to get them then? Or where can I pull that if there's somewhere else? (I'm just downloading from the GUI right now to get the above)
Sound like maybe there's a debug error here too. 😏
m

Michael Adkins

02/08/2022, 9:11 PM
Seems like you’re only downloading a portion of what’s available as you mentioned above?
I don’t know how the download/pagination works in the UI off the top of my head.
t

Tom Shaffner

02/08/2022, 9:12 PM
Yes, in the gui you can't see the full log at once. You have to click "show older" or "show newer", and when you download (annoyingly) it downloads only the current page, not the whole log
m

Michael Adkins

02/08/2022, 9:12 PM
I think there’s an open issue for that
👍 1
t

Tom Shaffner

02/08/2022, 9:12 PM
I downloaded basically the head, so you could see the executor and such, and then paged down to the section that included the task in question
I'll see if I can download more to make sure I get them all; hold on...
m

Michael Adkins

02/08/2022, 9:12 PM
There should be more logs for the task in question that are prefixed with “Task ‘…’:”
We’re interested in the ones about the timeout handler and pickling results
t

Tom Shaffner

02/08/2022, 9:14 PM
To get this I picked "load older" until the top, and downloaded that. Then I clicked load newer, downloaded again (logs (1)) and then repeated (logs (2)) which should cover the entire period now
logs.txtlogs (1).txtlogs (2).txt
The end shows the cancellation which I triggered after I reached out here
I think we had some version of this issue last time too; you had me switch to trigger the flow manually with some manual command that turned on debug more to address it I think; maybe we need to do that again?
And if so, maybe there's another issue here that debug turned on at the flow level doesn't seem to propagate fully
The task in question here is imported from another module, so maybe that breaks the log level setting somehow?
m

Michael Adkins

02/08/2022, 9:17 PM
The relevant logs are
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Attaching process based timeout handler...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Sending execution to a new process...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Waiting for process to return with 10800s timeout...
8 February 2022,03:15:18    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Executing...
8 February 2022,03:31:10    prefect INFO    Rows returned in oracle data query: 96,562...
8 February 2022,03:31:10    prefect DEBUG   Oracle pull completed; returning...
8 February 2022,03:31:10    prefect.CloudTaskRunner DEBUG   Task 'Pull Oracle Data': Execution successful.
We are reaching this line https://github.com/PrefectHQ/prefect/blob/master/src/prefect/utilities/executors.py#L299
t

Tom Shaffner

02/08/2022, 9:18 PM
Oh, was that in what I sent? Good. Sorry, I thought I'd included that in the second one at the top
m

Michael Adkins

02/08/2022, 9:18 PM
But not seeing this log https://github.com/PrefectHQ/prefect/blob/master/src/prefect/utilities/executors.py#L317-L319
t

Tom Shaffner

02/08/2022, 9:18 PM
Hmm, so something to do with the pickling again then?
m

Michael Adkins

02/08/2022, 9:18 PM
well.. we’re not even getting the pickling log
Is it possible your run is running into memory constraints or something?
t

Tom Shaffner

02/08/2022, 9:19 PM
I doubt it, but let me check
m

Michael Adkins

02/08/2022, 9:19 PM
Is your timeout being reached or are you cancelling it before then?
t

Tom Shaffner

02/08/2022, 9:20 PM
That's strange; basically the only stuff between those lines are the except blocks, but nothing gets thrown?
In one earlier version of this test the timeout did get hit and trigger a retry
It's a long timeout though; that's what flagged me originally to look into the task to discover this error. In the one I sent you above, I cancelled before the timeout was hit
m

Michael Adkins

02/08/2022, 9:20 PM
Yeah the lack of message is very peculiar from looking at the code, something weird must be happening.
t

Tom Shaffner

02/08/2022, 9:21 PM
Also, incidentally, the version with processes just got to the point of getting the data back and it's still running so the threads to process switch didn't seem to help
This is my available memory over the last 24 hours on this machine, so definitely not hitting memory limits
I haven't dropped below 20GB free memory at any point in the last day
m

Michael Adkins

02/08/2022, 9:24 PM
You can test this locally with
PREFECT__LOGGING__LEVEL=DEBUG prefect run --name "hello-world" --project "example" --execute
Where name/project are replaced with your flow name/project
t

Tom Shaffner

02/08/2022, 9:25 PM
So....if it threw an error of any sort we should have hit one of those except blocks and gotten an error, right? Is it possible that it's failing ON line 317 somehow? And if so, maybe that's also causing issues in the related except block for it?
Maybe somehow getting the return_val value causes an error, causing both line 317 and the except to fail?
m

Michael Adkins

02/08/2022, 9:27 PM
I guess it’s possible?
t

Tom Shaffner

02/08/2022, 9:27 PM
Running now with the abovel ocal test line
m

Michael Adkins

02/08/2022, 9:28 PM
The
type(return_val)
call would have to fail which seems very unlikely
t

Tom Shaffner

02/08/2022, 9:28 PM
Is there another way that 299 would work but then nothing would happen? i.e. no debug line and also no error? I don't see another path for that to be possible.
m

Michael Adkins

02/08/2022, 9:28 PM
It’s possible that somehow the logs aren’t flushing from the subprocess
t

Tom Shaffner

02/08/2022, 9:28 PM
(not that I'm the authority on this though 😏 )
m

Michael Adkins

02/08/2022, 9:29 PM
So they are logging but we don’t get them
t

Tom Shaffner

02/08/2022, 9:29 PM
Right. Hopefully the run I just started, using your above command, will give us that though, right?
m

Michael Adkins

02/08/2022, 9:29 PM
It’s really hard to say though, a local run may reveal more since we don’t have to rely on the logs getting sent to cloud.
t

Tom Shaffner

02/08/2022, 9:29 PM
It's running now
m

Michael Adkins

02/08/2022, 9:30 PM
We’re using different timeout mechanisms in Orion so hopefully we don’t have this problem in the future 😄
❤️ 1
You seem to be an unlucky one, I thought I’d fixed this up
t

Tom Shaffner

02/08/2022, 9:30 PM
What's the timeframe for that?
hahaha, you did! I hadn't seen the issue again until today!
m

Michael Adkins

02/08/2022, 9:31 PM
It’s coming along quickly! Since we’re releasing 1.0 of current Prefect, we’re able to focus more attention on 2.0/Orion.
t

Tom Shaffner

02/08/2022, 9:32 PM
Is the RC more stable than the current version btw? Should I upgrade from the one I'm on to rc1 or wait still?
I'm pretty excited for orion. I have to say, it looks like I'll be able to restructure all my flows to have a LOT more code reuse; hoping that comes sooner. Every one of these flows I add now is another I'll have to restructure then.
m

Michael Adkins

02/08/2022, 9:33 PM
I’d say it’s less stable, it’s mostly the removal of old features.
👍 1
t

Tom Shaffner

02/08/2022, 9:33 PM
Good to know; I'll wait then
That flow is still running, btw, just waiting for it to return. Query has been taking 15-20 minutes today
m

Michael Adkins

02/08/2022, 9:36 PM
👍 lmk
t

Tom Shaffner

02/08/2022, 9:42 PM
Oooh, we did get more! Hold on...
logs.txt
The pickling size line did work
And then nothing...
That would suggest it's the dumps line where we're stopping
m

Michael Adkins

02/08/2022, 9:44 PM
What happens if you try to cloudpickle the result within your task?
t

Tom Shaffner

02/08/2022, 9:44 PM
Oooh, I'll bet this means I'm getting some new data type in this query that my other queries don't have. That's why I'm seeing the behavior now and not before
m

Michael Adkins

02/08/2022, 9:45 PM
140 MB is kind of big
t

Tom Shaffner

02/08/2022, 9:45 PM
Is it? It's a LONG way from the biggest I'm working with
m

Michael Adkins

02/08/2022, 9:45 PM
I guess not 🤷 I’m not sure what cloudpickle’s limitations are
It seems like it should be a fine size
t

Tom Shaffner

02/08/2022, 9:46 PM
But, last time we had this same issue, and the changes you added fixed it. So why would this one fail again?
m

Michael Adkins

02/08/2022, 9:48 PM
I cannot say. There were a lot of possible issues with the old implementation but I thought I’d conquered them all.
t

Tom Shaffner

02/08/2022, 9:51 PM
Added a test version that pickles in the task and restarting
New task:
@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via_with_pickle(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
    if cache_flag:
        <http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
    <http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
    with Oracle_DB(server=server) as o_db:
        df = o_db.pull_data_from_oracle(oracle_query_sql)
    data_amount=len(df.index)
    <http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
    assert data_amount!=0 # Cause task failure if 0 is returned    
    logger.debug(f"Oracle pull completed; returning...")
    pickled_val = cloudpickle.dumps(df)
    logger.debug(f"Pickling successful")
    return df
Thinking through what else might have caused this, I do think this particular query returns fields that are likely longer than any other I've had to date.
i.e. in the pandas dataframe, a particular column will have elements with longer values than any other I've had; some could be very long
The test task just returned; same thing happens. It gets to the dumps line and then just stops
Further testing; I removed the columns that could be very long and the error went away. 😞 That basically means there's some unknown issue, likely a length limit, to the data I can get back from a database, and encountering the issue doesn't even return an error apart from a timeout. Thoughts on anything I could do to be able to keep that kind of data in here?
Or at the least to hammer down what the limit is?
m

Michael Adkins

02/08/2022, 10:37 PM
Just to clarify: It dumps correctly when you do it in your task rather than in the timeout handler?
I think we’re encountering this https://stackoverflow.com/questions/11854519/python-multiprocessing-some-functions-do-not-return-when-they-are-complete-que
The data is too large to place into a multiprocessing queue and it is causing a deadlock
t

Tom Shaffner

02/09/2022, 2:43 PM
No, it didn't work in the task either, encountered the same error. It worked when I removed the columns that might have very long data.
This data regularly works fine in csv or excel format though.
Given the above, are there options here? In the meantime I'm stuck basically excluding some data from my process.
k

Kevin Kho

02/09/2022, 2:47 PM
If the issue is a multiprocessing deadlock, you may have to either go to a LocalExecutor, or break up the work unfortunately. Michael may have other ideas
m

Michael Adkins

02/09/2022, 3:15 PM
Oh if it didn’t work in the task either, it’s not a multiprocessing deadlock.
I’d report this as a bug to cloudpickle if you can make a minimal reproducible example, I don’t think pickle should hang like this.
You could attempt to serialize it another way. Perhaps by returning
<http://df.to|df.to>_json()
from your task instead of the raw data.
View count: 3