scott
12/07/2022, 7:27 PMIlya Galperin
12/07/2022, 8:19 PMZachary Loertscher
12/07/2022, 9:17 PMsql_server
extra and pyodbc
package). We are running Prefect 1.2.0.
Deployment is successful, and the flow is successfully registered (the imports in my .py
script run successfully), but Prefect Cloud can't find the package:
Failed to load and execute flow run: ImportError('Using prefect.tasks.sql_server requires Prefect to be installed with the "sql_server" extra.')
Is there a way to re-sync prefect-cloud with my docker container? Just seems like Prefect cloud isn't finding the packages I have installed on my containerMichael Cody
12/07/2022, 10:02 PMprefect config set PREFECT_LOGGING_FORMATTERS_SIMPLE_DATEFMT="%Y-%m-%d %H:%M:%S"
which return "Unknown setting name 'PREFECT_LOGGING_FORMATTERS_SIMPLE_DATEFMT'"
.
If I try prefect config set PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER=json
from this this message from a month ago, I get the same error. This should be simple but I think I'm missing something. Editing the logging config works, but I'd rather have it in the profile instead of copying the logging.yml file.
Thanks.
https://prefect-community.slack.com/archives/CL09KU1K7/p1666798948425739?thread_ts=1666794116.134359&cid=CL09KU1K7Paige Gulley
12/07/2022, 10:39 PMMike Grabbe
12/07/2022, 11:31 PM[
]
aren't being logged correctly. Has anyone else noticed this? Details in 🧵wonsun
12/08/2022, 4:51 AMMahesh
12/08/2022, 6:22 AMprefect get
command to get logs.Olivér Atanaszov
12/08/2022, 11:27 AMon_handle
argument of Flow
)?Sunjay
12/08/2022, 2:17 PMKelvin DeCosta
12/08/2022, 2:28 PMflake8
(or any other linter) plugin to check for prefect
specific code issues.
The main issue I find myself making is calling a task
from another one.
I don't think a static type checker can detect this type of issue.
There are probably more issues that are waiting to be discovered and I think it would benefit the community if there was a linting tool that accounted for prefect
Nathan R
12/08/2022, 2:37 PMBabak
12/08/2022, 5:20 PMAshley Felber
12/08/2022, 5:25 PMChris Gunderson
12/08/2022, 5:51 PMKyle McChesney
12/08/2022, 5:59 PMNone
.Slackbot
12/08/2022, 6:22 PMmerlin
12/08/2022, 6:22 PMSean Conroy
12/08/2022, 7:44 PMrun_migrations
error using Prefect 2.7.0...full traceback in the reply. Anyone familiar with this?Shruti Hande
12/09/2022, 9:13 AMClovis
12/09/2022, 10:11 AMJSON
block ? I did see nothing like that in documentation.
To give some context, I got multiple concurrent tasks depending on a common block and actualizing only different part of it. As I don’t want to lose data and because of the concurrency, I want to avoid loading the JSON
block in each tasks, saving it just after with my updates and risking to overwrite it with missing value. So my question here is can I save only part of the JSON
block ? Or maybe there is another solution (like lock or something) to prevent block data loss when it comes to async parallel task ?Vadym Dytyniak
12/09/2022, 10:34 AMThomas Fredriksen
12/09/2022, 1:14 PMfrom typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]
When running this with the DaskTaskrunner
, the task get_primes_from_split
is scheduled first, then the dask-future is_prime
. Since get_primes_from_split
is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else. get_primes_from_split
naturally is waiting for is_prime
to complete, which unfortunately will not execute at this point.
Toying around with priorities, I managed to get is_prime
to execute:
from typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
with dask.annotate(priority=0):
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])
This causes dask to schedule a few instances of get_primes_from_split
, which in turn schedules all its instances of is_prime
. is_prime
executes properly and starts returning its results, but it doesn't seem like get_primes_from_split
picks up execution.
I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?Jelle Vegter
12/09/2022, 2:42 PMRio McMahon
12/09/2022, 2:44 PM"SLACK_WEBHOOK_URL"
.Kendall Bailey
12/09/2022, 4:00 PMChris Gunderson
12/09/2022, 4:40 PMDenis Sh
12/09/2022, 5:24 PMstorage_name = f"{self.account.name}-session"
try:
self.session = JSON.load(storage_name)
except ValueError as e:
self.logger.error(e)
self.session = JSON(value={})
self.session.save(name=storage_name)
<http://self.logger.info|self.logger.info>(f"created session storage ({storage_name=})")
but flow keeps failing on this exception.. How to gracefully handle it?
ADDED:
seems the problem was capital letters in block name! otherwise code works as intended.
fixed by modifying type in model definition for account.name to constr(to_lower=True)Edmund Tian
12/09/2022, 5:58 PMJoshua Grant
12/09/2022, 6:04 PMupload_options
argument to the S3
block, which is great because I need to specify ServerSideEncryption
, however, I'm receiving AccessDenied
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied