Richard Hughes
08/18/2020, 7:46 PMRichard Hughes
08/18/2020, 7:47 PMjosh
08/18/2020, 8:30 PM0.13.3
and here are a few notable changes:
🌲 More control over log levels
📦 Pass any arguments to Docker tasks
🔑 Fixed caching when no cache_key is present
A big thank you to our contributors who helped out with this release! Full changelog:bral
08/18/2020, 9:03 PMfrom prefect import task, Flow, case, Task
from random import randint
from prefect.schedules import IntervalSchedule
from datetime import timedelta
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment
class Preprocess:
def __init__(self, number):
print(number)
self.number = number
self.data = None
def read_file(self):
self.data = self.number
def preprocess_file(self):
self.data = self.data * 2
def save_file(self):
self.data = self.data / 3
@task
def is_running():
return False
@task
def get_files():
return [i for i in range(0, randint(0, 15))]
@task
def _print(lst):
print(lst)
print(len(lst))
@task
def etl(file):
prep = Preprocess(file)
prep.read_file()
prep.preprocess_file()
prep.save_file()
executor = DaskExecutor(address="<tcp://localhost:8786>")
schedule = IntervalSchedule(interval=timedelta(seconds=5))
local = LocalEnvironment(executor=executor)
with Flow("process", schedule=schedule, environment=local) as flow:
condition = is_running()
with case(condition, False) as cond:
files = get_files()
_print(files)
etl.map(files)
flow.run()
Accorging tuto : https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html
I started scheduler and 2 workers , and after looked at the boker-web ui. But Tasks are empy on ui.Will Goldstein
08/18/2020, 9:03 PMWill Goldstein
08/18/2020, 9:07 PMprefect server start
for the scheduler) it keeps firing up pods that try to connect to localhost:4200 for the graphql endpoint and failing because they can't access localhost
from inside a pod. Is there a way to specify the endpoint the pods should be trying to connect to (e.g. to `kubernetes.docker.internal`instead)?Will Goldstein
08/18/2020, 9:08 PMCody Kaiser
08/18/2020, 9:09 PMChris Marchetti [Datateer]
08/18/2020, 9:55 PMdennis
08/19/2020, 3:11 AMKlemen Strojan
08/19/2020, 7:46 AMFailed to load and execute Flow's environment: ConnectionError(MaxRetryError("HTTPSConnectionPool(host='<http://api.github.com|api.github.com>', port=443): Max retries exceeded with url: /repos/<ORG>/<REPO> (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7ff5caf88668>: Failed to establish a new connection: [Errno -2] Name or service not known',))",),)
What are potential causes for this error?
When there is downtime on Github, we see Failed to load and execute Flow's environment: GithubException(500, None)
.
What part of the source code should I explore to understand this better?x062Wyhdolq
08/19/2020, 8:33 AMNuno Silva
08/19/2020, 2:54 PMflow.run(...)
but when I flow.register(...
) and then run it from the UI, I get errors in the dask workers in k8s saying that cannot connect to localhost:4200
. which makes sense since that localhost:4200 has to be replaced by the IP where the prefect server is running. how to do that? (https://github.com/PrefectHQ/prefect/issues/3185)Michael Ludwig
08/19/2020, 3:26 PMTask
or imported from from prefect.utilities.logging import get_logger
We get some weird double logging with different formats. I am not sure if it is a pure Prefect issue or just something we do wrong on our ends. But did someone else see something like this an has a solution?
[2020-08-19 15:21:50] INFO - prefect.TaskRunner | Task 'load': finished task run for task with final state: 'Success'
Task 'load': finished task run for task with final state: 'Success'
[2020-08-19 15:21:51] INFO - prefect.TaskRunner | Task 'extract': finished task run for task with final state: 'Success'
Task 'extract': finished task run for task with final state: 'Success'
[2020-08-19 15:21:51] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Flow run SUCCESS: all reference tasks succeeded
[2020-08-19 15:21:52] INFO - prefect.orchestrator.utils | Flow state transition: <Running: "Running flow."> -> <Success: "All reference tasks succeeded.">
Flow state transition: <Running: "Running flow."> -> <Success: "All reference tasks succeeded.">
Jonas Hanfland
08/19/2020, 4:01 PMBigQueryTask()
arguments at runtime? Basically I would like to be able to change the table_dest
argument based on an environment variable, so that I can save my query result in a different place depending on if I run the flow locally or elsewhere.
Thanks very much in advanceCharles Leung
08/19/2020, 4:24 PMBrian Mesick
08/19/2020, 5:14 PMWill Goldstein
08/19/2020, 5:46 PMWill Goldstein
08/19/2020, 6:09 PMbral
08/19/2020, 8:43 PMMichael Reeves
08/19/2020, 10:16 PMwith Flow('file') as flow:
name = Parameter('name')
lines = read_file(name=name)
for l in lines:
do_line(l)
finish()
I know theres a prefect Map
functionality, but I didn't see an easy way to make it so each task for each line depends on its previous line task in an iterative mannerMax Lei
08/19/2020, 10:57 PMprefect backend server &
prefect agent start &
prefect server start &
Then I registered my service using:
flow.register(project_name="ML")
Finally went to the Web UI and click on quick run, but then it’s sort of been stuck there for a while now
Is there any way to debug why this is happening?raman
08/19/2020, 11:12 PMMaxwell Dylla
08/20/2020, 12:09 AM@flow
decorator works well for me, but the class-based API confuses my IDE. For example, my IDE thinks the class-based result should be type `Task`:
from prefect import Flow, Task, task
@task
def func_task() -> str:
return ""
class ClassTask(Task):
def run(self) -> str:
return ""
class_task = ClassTask()
with Flow("test_type_hints") as flow:
func_result: str = func_task()
class_result: str = class_task() # Expected type 'str', got 'Task' instead
flow.run()
Max Lei
08/20/2020, 4:09 AMflow.run()
everything works fine, but when I try to use the prefect server, it’s complaining about environmental variables not setup for AWS, such as: KeyError: 'No access key found. Please set the environment variable AWS_ACCESS_KEY_ID.'
Is this because the runs are being run inside the docker container?Alfie
08/20/2020, 7:08 AMAlfie
08/20/2020, 7:08 AMMatias Godoy
08/20/2020, 12:22 PMJulien Allard
08/20/2020, 1:51 PMUnexpected error: TypeError("Cannot map over unsubscriptable object of type <class 'NoneType'>: None...")
. The problems seems to come from a mapped task that outputs a pandas dataframe.
Anyone has any ideas on how to fix this problem or how to debug it further?Richard Hughes
08/20/2020, 2:02 PM