Martim Lobao
09/08/2021, 5:23 PMAn Hoang
09/08/2021, 6:49 PM#input : Have a list of X: [X1,X2, X3 ...etc]
#input: total_times = 1_000_000
#property: x.do_work(1_000_000) = 1000 * x.do_work(1000)
#output needed: [X1.do_work(1_000_000), X2.do_work(1_000_000) ..etc]
@task
def long_running_task(x, n_times):
result = x.do_work(n_times)
return result
I have hundreds of Dask workers and want to split this work into units of x.do_work(1000)
to get the output as fast as possible through maximizing parallelism. How should I write my mapping functions to achieve this?
Do I just generate a list of X_list = [x] *1000
for each X
and a list of iteration_list = [1000] * 1000
, then do long_running_task.map(x=flatten(X_list), n_times = flatten(iteration_list))
?Jose Chen
09/08/2021, 7:19 PMDavid Jenkins
09/08/2021, 7:24 PMrandom_string = GenerateRandomData(data=data)
3. random_string from flow1 will then be a parameter sent to flow2, so the parameter is dynamically set at runtime since the value of random_string is different with each execution of the three flows.
I learn best from reading code, so a basic example would be most helpful.Erik Amundson
09/08/2021, 8:49 PMimport prefect
@prefect.task
def do_nothing(n):
pass
with prefect.Flow("Dummy Flow") as flow:
listy = list(range(200000))
do_nothing.map(listy)
The scheduler pod runs out of memory after around 300 tasks, the screenshot of the dask dashboard is attached.
Has anyone ran into this issue or have any ideas for a fix? We normally run prefect 0.14.16 but I've tried on both 0.14.16 and latest (0.15.5) with the same results.Aric Huang
09/08/2021, 11:44 PMprefect.context
object or elsewhere?
• The flow ID used in the URL https://cloud.prefect.io/paravision/flow/<ID> seems to be different from the flow ID that's output when registering a flow. Is it possible to get that ID during a flow run through prefect.context
? prefect.context.flow_id
looks like the same one that's returned when registering the flow, but would like to also get this other flow ID.
• When registering a flow, it increments the version number and archives the previous flow version. Is it possible to un-archive a previous version so it can be run again?Gaylord Cherencey
09/09/2021, 5:48 AMFileNotFoundError, No such file or directory
while using Git storage to a private git server. I set Secret as ENV and specify git_token_secret_name
. Does this mean that the agent can connect to the repository but don't find the Flow python file?Eddie Atkinson
09/09/2021, 5:57 AMStartFlowRun
support passing parameters as lists of dictionaries in Python? I am currently getting the following error when I try to do that:
prefect.exceptions.ClientError: 400 Client Error: Bad Request for url: <https://api.prefect.io/>
This is likely caused by a poorly formatted GraphQL query or mutation but the response could not be parsed for more details
I tried performing the same operation via the interactive graphql api and I needed to escape the {
and "
to get the query to work. Looking at the source it seems as though this doesn’t happen and might be the cause of my problem.Ben Muller
09/09/2021, 7:13 AMschedule = Schedule(
clocks=[
CronClock(
cron="*/7 * * * *",
parameter_defaults=dict(account_identifier="inrun"),
start_date=datetime.now(tz=timezone("Australia/Brisbane")),
)
)
with Flow(
name="betfair_flow",
storage=Storage().in_a_s3_bucket(),
run_config=RunConfig().fargate_on_ecs(cpu=512, memory=2048),
schedule=schedule,
executor=LocalDaskExecutor(scheduler="threads"),
) as flow:
betfair_account_identifier = Parameter("account_identifier")
Error in 🧵Filip Lindvall
09/09/2021, 8:57 AMdef some_task(id:str, old: Dict, new: Dict)
and I have data as [{"id":"some_id", "old":{...}, "new":{...}}, ...]
is there some easy way to explode the dict into the tasks, or is it best to write some wrapper task whose only responsibility is to take the data and pass it to the intended task.Issam Assafi
09/09/2021, 11:48 AMMartim Lobao
09/09/2021, 12:19 PMRun(attractive-bug
) of flow821098cf-c921-49ef-93d2-dd741b440aca
failedexample
SLA (STARTED_NOT_FINISHED
) after 60 seconds. See the UI for more details.ee53f0cf-6d07-4dee-afb6-d1205baf141c
Rafael
09/09/2021, 12:29 PMimport prefect
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
result = LocalResult(dir="./prefect-results", )
@task
def hello_task(named_params):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(named_params)
return 'asd'
with Flow("hello-flow", result=result) as flow:
named_params = Parameter("named_params",)
hello_task(named_params)
flow.run(named_params='param')
I was expecting to find a file in the the ./prefect-results
folder.. the folder is there but it is empty.. I also tried
import prefect
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
@task(checkpoint=True, result=LocalResult(dir="./prefect-results"))
def hello_task(named_params):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(named_params)
return 'asd'
with Flow("hello-flow", ) as flow:
named_params = Parameter("named_params",)
hello_task(named_params)
flow.run(named_params='param')
also empty..
Am I doing something wrong, or I misunderstood how this feature work?Abhishek
09/09/2021, 12:54 PMMichael
09/09/2021, 2:45 PM{upstream}
context option for a given task that somehow serializes the upstream dependencies into the result / target file name at run time. I know there is a {parameters}
context value (so I could use this in worst case), but not every task in a particular flow depends on all of that flow’s parameters, so this would result in unecessary computation.
Right now I implement something like this inside the task’s run logic itself, but I imagine it’s a helluva lot slower than letting the orchestrator deal with skipping tasks rather than entering every task to perform a file existence check before exiting (and also a lot less clean).
Cheers for any help!Sean Talia
09/09/2021, 2:53 PM@task
def custom_task_1(input) -> str:
return f"Custom Input: {input}"
with Flow(...) as flow:
custom_task_1_results = custom_task_1("Hello World!")
custom_task_2 = CustomTask2(message=custom_task_1_results)
but when I look at my flow's schematic, it doesn't show custom_task_1
as being upstream of CustomTask2
...I don't remember ever having run into this, so I feel like there's something quite obvious that I must be missing. I appreciate any help!Alex
09/09/2021, 3:07 PMModuleNotFoundError
. To make this work will I need to create a docker image that includes all the dependencies? Also, this means that I will have to use another option for storage?Jack Sundberg
09/09/2021, 3:17 PMMichael
09/09/2021, 6:04 PM*prefect*.config.flows.checkpointing = True
somewhere in the code). But I can’t for the life of me get it to work when I use my Docker agent that’s connected to Prefect cloud. I set --env PREFECT__FLOWS__CHECKPOINTING=true
as an env variable when I run the agent, but it never uses any checkpointing logic. Any ideas here? I’m more than happy to provide more info as neededJack Sundberg
09/09/2021, 7:00 PMclient.create_project(project_name="Hello, World!")
is_successful = client.delete_project(project_name="Hello, World!")
client.create_project(project_name="Hello, World!") # FAILS HERE
The second call to create_project fails with a ClientError due to a uniqueness violation -- but in raising that error, the client is unable to tell which existing project causes the uniqueness violation (bc I just deleted it).
Would anyone have insight as to what's going on here? I've tried sleeping between each line too so I don't think this is a race condition, but I'm not totally sure.joshua mclellan
09/09/2021, 7:17 PMMaikel Penz
09/09/2021, 8:29 PMNoCredentialsError('Unable to locate credentials')
from the job execution.
In the past I used the ECS agent and I was able to pass a task_role_arn
to it. However I see that both the KubernetesAgent
and the run config for Kubernetes KubernetesRun
don’t have a “role” parameter to inform.
My next thought was that either the role attached to the EKS cluster
or the Fargate profile
should do the work, but as a test I gave admin to both and I still get the credentials issue.
What am I missing ?Constantino Schillebeeckx
09/09/2021, 8:47 PMf"{os.path.dirname(os.path.realpath(__file__))}/layouts"
, however this causes the following (when executing from Prefect cloud):
Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
how come that global isn't available?Abhas P
09/09/2021, 10:33 PMsean williams
09/09/2021, 10:38 PMThe LogRocket software has a feature that allows for the “video-like” replay of a previous user sessionit looks like disabling telemetry might not disable logrocket's session recording? Am I interpreting that correctly or am I missing a config to disable it? If anything, logrocket seems more invasive than GA telemetry. Please let me know if this belongs in a different channel. Thanks
Jacq Crous
09/10/2021, 7:23 AMMichael
09/10/2021, 8:08 AMThomas Fredriksen
09/10/2021, 8:10 AMFlow
class so that all tasks will report state to an external service. From what I undersand, subclassing the Flow
class may not be a good idea as the prefect server will not use the subclass when managing the flows.
I like that prefect has support for state change hooks (state_handlers
), and I was wondering if I can use such a hook to modify the tasks of all tasks of a flow in order to add tasks state handlers that will report the task state change.
Something like this:
def task_state_handler(task, old_state, new_state):
report_state(task, old_state, new_state)
def flow_state_handler(flow, old_state, new_state):
was_pending = old_state.is_queued() or old_state.is_scheduled() or old_state.is_pending() or old_state.is_submitted()
if was_pending and new_state.is_running():
for task in flow.tasks:
task.state_handlers.append(task_state_handler)
Posted in #prefect-serverTerawat T
09/10/2021, 8:50 AMDEBUG:agent:Querying for ready flow runs...
[xx] DEBUG - agent | Creating namespaced job prefect-job-c4a440b5
DEBUG:agent:Creating namespaced job prefect-job-c4a440b5
[xx] DEBUG - agent | Job prefect-job-c4a440b5 created
DEBUG:agent:Job prefect-job-c4a440b5 created
[xx] INFO - agent | Completed deployment of flow run 549ced42-7840-43d6-89da-6f1a1f9cb378
INFO:agent:Completed deployment of flow run 549ced42-7840-43d6-89da-6f1a1f9cb378
Nacho Rodriguez
09/10/2021, 8:53 AM