Shruti Hande
12/09/2022, 9:13 AMClovis
12/09/2022, 10:11 AMJSON
block ? I did see nothing like that in documentation.
To give some context, I got multiple concurrent tasks depending on a common block and actualizing only different part of it. As I don’t want to lose data and because of the concurrency, I want to avoid loading the JSON
block in each tasks, saving it just after with my updates and risking to overwrite it with missing value. So my question here is can I save only part of the JSON
block ? Or maybe there is another solution (like lock or something) to prevent block data loss when it comes to async parallel task ?Vadym Dytyniak
12/09/2022, 10:34 AMThomas Fredriksen
12/09/2022, 1:14 PMfrom typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]
When running this with the DaskTaskrunner
, the task get_primes_from_split
is scheduled first, then the dask-future is_prime
. Since get_primes_from_split
is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else. get_primes_from_split
naturally is waiting for is_prime
to complete, which unfortunately will not execute at this point.
Toying around with priorities, I managed to get is_prime
to execute:
from typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
with dask.annotate(priority=0):
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])
This causes dask to schedule a few instances of get_primes_from_split
, which in turn schedules all its instances of is_prime
. is_prime
executes properly and starts returning its results, but it doesn't seem like get_primes_from_split
picks up execution.
I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?Jelle Vegter
12/09/2022, 2:42 PMRio McMahon
12/09/2022, 2:44 PM"SLACK_WEBHOOK_URL"
.Kendall Bailey
12/09/2022, 4:00 PMChris Gunderson
12/09/2022, 4:40 PMDenis Sh
12/09/2022, 5:24 PMstorage_name = f"{self.account.name}-session"
try:
self.session = JSON.load(storage_name)
except ValueError as e:
self.logger.error(e)
self.session = JSON(value={})
self.session.save(name=storage_name)
<http://self.logger.info|self.logger.info>(f"created session storage ({storage_name=})")
but flow keeps failing on this exception.. How to gracefully handle it?
ADDED:
seems the problem was capital letters in block name! otherwise code works as intended.
fixed by modifying type in model definition for account.name to constr(to_lower=True)Edmund Tian
12/09/2022, 5:58 PMJoshua Grant
12/09/2022, 6:04 PMupload_options
argument to the S3
block, which is great because I need to specify ServerSideEncryption
, however, I'm receiving AccessDenied
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
Alex F
12/09/2022, 6:40 PM17:06:30 INFO Agent1
Submitted for execution: PID: 3327309
17:06:31 ERROR execute flow-run
Failed to load and execute flow run: PermissionError(13, 'Permission denied')
John-Craig Borman
12/09/2022, 6:40 PMAniruddha Bharadwaj
12/09/2022, 7:50 PMprefect deployment build ./data_source_flows/master_flow.py:master_flow -n master_flow -q test --storage-block github/flows --apply
Dillan Smith
12/09/2022, 8:15 PMhelm install prefect-orion-poc prefect/prefect-orion --values helm-orion-values.yaml
and my values file is
namespaceOverride: helm-prefect
postgresql:
auth:
password: password
ingress:
enabled: true
It outputs Get the application URL by running these commands: <http://prefect.local/>
but that comes back as an unknown host. Im pretty new to K8S so any help is appreciated.Lucas Cavalcanti Rodrigues
12/10/2022, 10:48 PMEdmondo Porcu
12/11/2022, 2:23 PMNico Neumann
12/11/2022, 9:45 PMLucas Cavalcanti Rodrigues
12/11/2022, 10:15 PMEmma Rizzi
12/12/2022, 9:39 AMENTRYPOINT ["./set_umask.sh"]
https://codeyarns.com/tech/2017-07-21-how-to-set-umask-for-docker-container.html#gsc.tab=0
Is there a way to override the entrypoint of the flow ? Or any other solution to set umask?Anco
12/12/2022, 12:23 PMJoël Luijmes
12/12/2022, 1:48 PMrun_deployment
.
Assuming that all prefect flows are contained in the same repository, and are deployed to the same orion server. Are there any benefits/disadvantages over one or the other?Jelle Vegter
12/12/2022, 2:01 PMSanthosh Solomon (Fluffy)
12/12/2022, 2:14 PM12:48:12.659 | ERROR | prefect.infrastructure.process - Process 'optimal-toucanet' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is caused by high memory usage causing the operating system to terminate the process.
Ashley Felber
12/12/2022, 3:00 PMAdam Roderick
12/12/2022, 3:53 PMScott Chamberlain
12/12/2022, 4:20 PM@task
functions called from a @flow
- all of our work is done outside of Python called via make commands, which I currently use shell_run_command
for. However, you can’t put shell_run_command
within a function decorated with @task
. I could just remove the @task
decorator. Is there a workflow folks use to leverage @task
decorator that run shell commands?Sean Conroy
12/12/2022, 5:18 PMfrom prefect import task, Flow
@task
def get_value():
return 10
with Flow("task-results") as flow:
v = get_value()
print(v)
state = flow.run()
I get <Task: get_value>
instead of 10
. What am I doing wrong?Scott Chamberlain
12/12/2022, 6:16 PMprefect deployment build
only allows for pointing at one file. If you want to pull in imports from other files like utility functions (not pypi packages) how do you suggest doing that? I assume we don’t have to have all code in the one file the deployment build command points at?Ashley Felber
12/12/2022, 6:32 PMAshley Felber
12/12/2022, 6:32 PMMichael Adkins
12/12/2022, 6:50 PMECSTask
as configured vs your pre-registered one.Ashley Felber
12/12/2022, 6:55 PMMichael Adkins
12/12/2022, 7:02 PMAshley Felber
12/12/2022, 7:09 PMMichael Adkins
12/12/2022, 7:12 PMAshley Felber
12/12/2022, 10:08 PMMichael Adkins
12/12/2022, 10:20 PMstream_output
which means we need to configure a log stream which can only be set per task definition not task run.Ashley Felber
12/12/2022, 10:42 PMMichael Adkins
12/12/2022, 10:45 PMstream_output
set on your ECSTask
?Ashley Felber
12/12/2022, 10:46 PMMichael Adkins
12/12/2022, 11:05 PMAshley Felber
12/12/2022, 11:18 PMMichael Adkins
12/12/2022, 11:45 PM